Compare commits
43 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c161d84fdf | ||
|
|
495b553285 | ||
|
|
21b770ba8b | ||
|
|
e9f94e0767 | ||
|
|
644ada1da9 | ||
|
|
0c40250849 | ||
|
|
1d1134a86d | ||
|
|
28e7664eb7 | ||
|
|
50f3ad641c | ||
|
|
cc7cf5f8c5 | ||
|
|
339f0f6e94 | ||
|
|
f558e43342 | ||
|
|
e374e5c90c | ||
|
|
563b775e49 | ||
|
|
de9e1a4515 | ||
|
|
f64b36f17a | ||
|
|
f0e8c82d31 | ||
|
|
5770d43230 | ||
|
|
d4944c236f | ||
|
|
33c761a187 | ||
|
|
d7e6da8d2c | ||
|
|
44d1a2415c | ||
|
|
c98ff50f06 | ||
|
|
8835fcb09e | ||
|
|
77c56e58c0 | ||
|
|
72c65ca4ee | ||
|
|
ab019b0bdc | ||
|
|
9709e45ad2 | ||
|
|
be1f80003c | ||
|
|
252fcca383 | ||
|
|
04ae8fa4a0 | ||
|
|
c95bd7776a | ||
|
|
8219167d05 | ||
|
|
e0a6881343 | ||
|
|
6e985d7f06 | ||
|
|
66719b05dd | ||
|
|
7197583fea | ||
|
|
ce29024eef | ||
|
|
e1ac67f7fa | ||
|
|
01812144dd | ||
|
|
1c34e49629 | ||
|
|
f233fbfb25 | ||
|
|
5387115e4a |
@@ -1 +1,2 @@
|
||||
* `global.yaml` - 全局配置
|
||||
* `api.template.yaml` - API相关配置模板
|
||||
* `cluster.template.yaml` - 通过集群自动接入节点模板
|
||||
11
internal/apps/main.go
Normal file
11
internal/apps/main.go
Normal file
@@ -0,0 +1,11 @@
|
||||
// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||
|
||||
package apps
|
||||
|
||||
import teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
|
||||
func RunMain(f func()) {
|
||||
if teaconst.IsMain {
|
||||
f()
|
||||
}
|
||||
}
|
||||
@@ -7,9 +7,9 @@ import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/ttlcache"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils/fnv"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@@ -450,7 +450,7 @@ func (this *FileList) UpgradeV3(oldDir string, brokenOnError bool) error {
|
||||
remotelogs.Println("CACHE", "upgrading local database finished")
|
||||
}()
|
||||
|
||||
db, err := sql.Open("sqlite3", "file:"+indexDBPath+"?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF")
|
||||
db, err := dbs.OpenWriter("file:" + indexDBPath + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF&_locking_mode=EXCLUSIVE")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
package caches
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
||||
@@ -82,14 +81,15 @@ func (this *FileListDB) Open(dbPath string) error {
|
||||
}
|
||||
|
||||
// write db
|
||||
writeDB, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=private&mode=rwc&_journal_mode=WAL&_sync=OFF&_cache_size="+types.String(cacheSize)+"&_secure_delete=FAST")
|
||||
// 这里不能加 EXCLUSIVE 锁,不然异步事务可能会失败
|
||||
writeDB, err := dbs.OpenWriter("file:" + dbPath + "?cache=private&mode=rwc&_journal_mode=WAL&_sync=OFF&_cache_size=" + types.String(cacheSize) + "&_secure_delete=FAST")
|
||||
if err != nil {
|
||||
return errors.New("open write database failed: " + err.Error())
|
||||
}
|
||||
|
||||
writeDB.SetMaxOpenConns(1)
|
||||
|
||||
this.writeDB = dbs.NewDB(writeDB)
|
||||
this.writeDB = writeDB
|
||||
|
||||
// TODO 耗时过长,暂时不整理数据库
|
||||
// TODO 需要根据行数来判断是否VACUUM
|
||||
@@ -109,7 +109,7 @@ func (this *FileListDB) Open(dbPath string) error {
|
||||
}
|
||||
}
|
||||
|
||||
this.writeBatch = dbs.NewBatch(writeDB, 4)
|
||||
this.writeBatch = dbs.NewBatch(writeDB.RawDB(), 4)
|
||||
this.writeBatch.OnFail(func(err error) {
|
||||
remotelogs.Warn("LIST_FILE_DB", "run batch failed: "+err.Error()+" ("+filepath.Base(this.dbPath)+")")
|
||||
})
|
||||
@@ -124,14 +124,14 @@ func (this *FileListDB) Open(dbPath string) error {
|
||||
}
|
||||
|
||||
// read db
|
||||
readDB, err := sql.Open("sqlite3", "file:"+dbPath+"?cache=private&mode=ro&_journal_mode=WAL&_sync=OFF&_cache_size="+types.String(cacheSize))
|
||||
readDB, err := dbs.OpenReader("file:" + dbPath + "?cache=private&mode=ro&_journal_mode=WAL&_sync=OFF&_cache_size=" + types.String(cacheSize))
|
||||
if err != nil {
|
||||
return errors.New("open read database failed: " + err.Error())
|
||||
}
|
||||
|
||||
readDB.SetMaxOpenConns(runtime.NumCPU())
|
||||
|
||||
this.readDB = dbs.NewDB(readDB)
|
||||
this.readDB = readDB
|
||||
|
||||
if teaconst.EnableDBStat {
|
||||
this.readDB.EnableStat(true)
|
||||
|
||||
@@ -3,6 +3,7 @@ package caches
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
"github.com/iwind/TeaGo/lists"
|
||||
@@ -14,6 +15,10 @@ import (
|
||||
var SharedManager = NewManager()
|
||||
|
||||
func init() {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
events.On(events.EventQuit, func() {
|
||||
remotelogs.Println("CACHE", "quiting cache manager")
|
||||
SharedManager.UpdatePolicies([]*serverconfigs.HTTPCachePolicy{})
|
||||
@@ -172,10 +177,15 @@ func (this *Manager) TotalDiskSize() int64 {
|
||||
this.locker.RLock()
|
||||
defer this.locker.RUnlock()
|
||||
|
||||
total := int64(0)
|
||||
var total = int64(0)
|
||||
for _, storage := range this.storageMap {
|
||||
total += storage.TotalDiskSize()
|
||||
}
|
||||
|
||||
if total < 0 {
|
||||
total = 0
|
||||
}
|
||||
|
||||
return total
|
||||
}
|
||||
|
||||
|
||||
@@ -508,7 +508,11 @@ func (this *MemoryStorage) flushItem(key string) {
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if !item.IsDone || item.IsExpired() {
|
||||
if !item.IsDone {
|
||||
remotelogs.Error("CACHE", "flush items failed: open writer failed: item has not been done")
|
||||
return
|
||||
}
|
||||
if item.IsExpired() {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
var sharedBrotliReaderPool *ReaderPool
|
||||
|
||||
func init() {
|
||||
if teaconst.IsDaemon {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
var sharedDeflateReaderPool *ReaderPool
|
||||
|
||||
func init() {
|
||||
if teaconst.IsDaemon {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
var sharedGzipReaderPool *ReaderPool
|
||||
|
||||
func init() {
|
||||
if teaconst.IsDaemon {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
var sharedZSTDReaderPool *ReaderPool
|
||||
|
||||
func init() {
|
||||
if teaconst.IsDaemon {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
var sharedBrotliWriterPool *WriterPool
|
||||
|
||||
func init() {
|
||||
if teaconst.IsDaemon {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
var sharedDeflateWriterPool *WriterPool
|
||||
|
||||
func init() {
|
||||
if teaconst.IsDaemon {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
var sharedGzipWriterPool *WriterPool
|
||||
|
||||
func init() {
|
||||
if teaconst.IsDaemon {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ import (
|
||||
var sharedZSTDWriterPool *WriterPool
|
||||
|
||||
func init() {
|
||||
if teaconst.IsDaemon {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package teaconst
|
||||
|
||||
const (
|
||||
Version = "0.6.2"
|
||||
Version = "0.6.4.2"
|
||||
|
||||
ProductName = "Edge Node"
|
||||
ProcessName = "edge-node"
|
||||
|
||||
@@ -5,6 +5,7 @@ package teaconst
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
||||
"os"
|
||||
"strings"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -15,7 +16,7 @@ var (
|
||||
|
||||
NodeId int64 = 0
|
||||
NodeIdString = ""
|
||||
IsDaemon = len(os.Args) > 1 && os.Args[1] == "daemon"
|
||||
IsMain = checkMain()
|
||||
|
||||
GlobalProductName = nodeconfigs.DefaultProductName
|
||||
|
||||
@@ -24,3 +25,15 @@ var (
|
||||
|
||||
DiskIsFast = false // 是否为高速硬盘
|
||||
)
|
||||
|
||||
// 检查是否为主程序
|
||||
func checkMain() bool {
|
||||
if len(os.Args) == 1 ||
|
||||
(len(os.Args) >= 2 && os.Args[1] == "pprof") {
|
||||
return true
|
||||
}
|
||||
exe, _ := os.Executable()
|
||||
return strings.HasSuffix(exe, ".test") ||
|
||||
strings.HasSuffix(exe, ".test.exe") ||
|
||||
strings.Contains(exe, "___")
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/ddosconfigs"
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/firewalls/nftables"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
@@ -27,6 +28,10 @@ import (
|
||||
var SharedDDoSProtectionManager = NewDDoSProtectionManager()
|
||||
|
||||
func init() {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
events.On(events.EventReload, func() {
|
||||
if nftablesInstance == nil {
|
||||
return
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
package firewalls
|
||||
|
||||
import (
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
"runtime"
|
||||
@@ -14,6 +15,10 @@ var firewallLocker = &sync.Mutex{}
|
||||
|
||||
// 初始化
|
||||
func init() {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
events.On(events.EventLoaded, func() {
|
||||
var firewall = Firewall()
|
||||
if firewall.Name() != "mock" {
|
||||
|
||||
@@ -24,7 +24,7 @@ import (
|
||||
|
||||
// check nft status, if being enabled we load it automatically
|
||||
func init() {
|
||||
if teaconst.IsDaemon {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ package nftables
|
||||
import (
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
@@ -17,6 +18,10 @@ import (
|
||||
)
|
||||
|
||||
func init() {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
events.On(events.EventReload, func() {
|
||||
// linux only
|
||||
if runtime.GOOS != "linux" {
|
||||
|
||||
@@ -15,7 +15,7 @@ var instanceId = uint64(0)
|
||||
|
||||
// New 新创建goroutine
|
||||
func New(f func()) {
|
||||
if teaconst.IsDaemon {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -47,7 +47,7 @@ func New(f func()) {
|
||||
|
||||
// NewWithArgs 创建带有参数的goroutine
|
||||
func NewWithArgs(f func(args ...interface{}), args ...interface{}) {
|
||||
if teaconst.IsDaemon {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -3,28 +3,27 @@
|
||||
package iplibrary
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
)
|
||||
|
||||
type IPListDB struct {
|
||||
db *sql.DB
|
||||
db *dbs.DB
|
||||
|
||||
itemTableName string
|
||||
|
||||
deleteExpiredItemsStmt *sql.Stmt
|
||||
deleteItemStmt *sql.Stmt
|
||||
insertItemStmt *sql.Stmt
|
||||
selectItemsStmt *sql.Stmt
|
||||
selectMaxVersionStmt *sql.Stmt
|
||||
deleteExpiredItemsStmt *dbs.Stmt
|
||||
deleteItemStmt *dbs.Stmt
|
||||
insertItemStmt *dbs.Stmt
|
||||
selectItemsStmt *dbs.Stmt
|
||||
selectMaxVersionStmt *dbs.Stmt
|
||||
|
||||
cleanTicker *time.Ticker
|
||||
|
||||
@@ -56,7 +55,7 @@ func (this *IPListDB) init() error {
|
||||
|
||||
var path = this.dir + "/ip_list.db"
|
||||
|
||||
db, err := sql.Open("sqlite3", "file:"+path+"?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF")
|
||||
db, err := dbs.OpenWriter("file:" + path + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF&_locking_mode=EXCLUSIVE")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ var SharedIPListManager = NewIPListManager()
|
||||
var IPListUpdateNotify = make(chan bool, 1)
|
||||
|
||||
func init() {
|
||||
if teaconst.IsDaemon {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ package metrics
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
"strconv"
|
||||
@@ -13,6 +14,10 @@ import (
|
||||
var SharedManager = NewManager()
|
||||
|
||||
func init() {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
events.On(events.EventQuit, func() {
|
||||
SharedManager.Quit()
|
||||
})
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
@@ -17,7 +16,6 @@ import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/zero"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
@@ -50,11 +48,11 @@ type Task struct {
|
||||
|
||||
cleanVersion int32
|
||||
|
||||
insertStatStmt *sql.Stmt
|
||||
deleteByVersionStmt *sql.Stmt
|
||||
deleteByExpiresTimeStmt *sql.Stmt
|
||||
selectTopStmt *sql.Stmt
|
||||
sumStmt *sql.Stmt
|
||||
insertStatStmt *dbs.Stmt
|
||||
deleteByVersionStmt *dbs.Stmt
|
||||
deleteByExpiresTimeStmt *dbs.Stmt
|
||||
selectTopStmt *dbs.Stmt
|
||||
sumStmt *dbs.Stmt
|
||||
|
||||
serverIdMap map[int64]zero.Zero // 所有的服务Ids
|
||||
timeMap map[string]zero.Zero // time => bool
|
||||
@@ -92,12 +90,12 @@ func (this *Task) Init() error {
|
||||
|
||||
var path = dir + "/metric." + types.String(this.item.Id) + ".db"
|
||||
|
||||
db, err := sql.Open("sqlite3", "file:"+path+"?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF")
|
||||
db, err := dbs.OpenWriter("file:" + path + "?cache=shared&mode=rwc&_journal_mode=WAL&_sync=OFF&_locking_mode=EXCLUSIVE")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
db.SetMaxOpenConns(1)
|
||||
this.db = dbs.NewDB(db)
|
||||
this.db = db
|
||||
|
||||
// 恢复数据库
|
||||
var recoverEnv, _ = os.LookupEnv("EdgeRecover")
|
||||
|
||||
@@ -5,6 +5,7 @@ package monitor
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
@@ -16,6 +17,10 @@ import (
|
||||
var SharedValueQueue = NewValueQueue()
|
||||
|
||||
func init() {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
events.On(events.EventLoaded, func() {
|
||||
goman.New(func() {
|
||||
SharedValueQueue.Start()
|
||||
|
||||
@@ -89,6 +89,8 @@ func (this *ClientConn) Read(b []byte) (n int, err error) {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
this.lastErr = errors.New("read error: " + err.Error())
|
||||
} else {
|
||||
this.lastErr = nil
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -103,7 +105,7 @@ func (this *ClientConn) Read(b []byte) (n int, err error) {
|
||||
}
|
||||
|
||||
// 设置读超时时间
|
||||
if this.isHTTP && !this.isWebsocket && !this.isShortReading && this.autoReadTimeout {
|
||||
if this.isHTTP && !this.isPersistent && !this.isShortReading && this.autoReadTimeout {
|
||||
this.setHTTPReadTimeout()
|
||||
}
|
||||
|
||||
@@ -143,12 +145,18 @@ func (this *ClientConn) Read(b []byte) (n int, err error) {
|
||||
}
|
||||
|
||||
func (this *ClientConn) Write(b []byte) (n int, err error) {
|
||||
if len(b) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
if this.isDebugging {
|
||||
this.lastWriteAt = time.Now().Unix()
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
this.lastErr = errors.New("write error: " + err.Error())
|
||||
} else {
|
||||
this.lastErr = nil
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -164,18 +172,25 @@ func (this *ClientConn) Write(b []byte) (n int, err error) {
|
||||
}
|
||||
|
||||
// 延长读超时时间
|
||||
if this.isHTTP && !this.isWebsocket && this.autoReadTimeout {
|
||||
if this.isHTTP && !this.isPersistent && this.autoReadTimeout {
|
||||
this.setHTTPReadTimeout()
|
||||
}
|
||||
|
||||
// 开始写入
|
||||
var before = time.Now()
|
||||
n, err = this.rawConn.Write(b)
|
||||
if n > 0 {
|
||||
// 统计当前服务带宽
|
||||
if this.serverId > 0 {
|
||||
if !this.isLO || Tea.IsTesting() { // 环路不统计带宽,避免缓存预热等行为产生带宽
|
||||
atomic.AddUint64(&teaconst.OutTrafficBytes, uint64(n))
|
||||
stats.SharedBandwidthStatManager.Add(this.userId, this.serverId, int64(n))
|
||||
|
||||
var cost = time.Since(before).Seconds()
|
||||
if cost > 1 {
|
||||
stats.SharedBandwidthStatManager.Add(this.userId, this.serverId, int64(float64(n)/cost), int64(n))
|
||||
} else {
|
||||
stats.SharedBandwidthStatManager.Add(this.userId, this.serverId, int64(n), int64(n))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -223,7 +238,7 @@ func (this *ClientConn) SetDeadline(t time.Time) error {
|
||||
|
||||
func (this *ClientConn) SetReadDeadline(t time.Time) error {
|
||||
// 如果开启了HTTP自动读超时选项,则自动控制超时时间
|
||||
if this.isHTTP && !this.isWebsocket && this.autoReadTimeout {
|
||||
if this.isHTTP && !this.isPersistent && this.autoReadTimeout {
|
||||
this.isShortReading = false
|
||||
|
||||
var unixTime = t.Unix()
|
||||
|
||||
@@ -16,7 +16,8 @@ type BaseClientConn struct {
|
||||
remoteAddr string
|
||||
hasLimit bool
|
||||
|
||||
isWebsocket bool
|
||||
isPersistent bool // 是否为持久化连接
|
||||
fingerprint []byte
|
||||
|
||||
isClosed bool
|
||||
|
||||
@@ -125,6 +126,16 @@ func (this *BaseClientConn) SetLinger(seconds int) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *BaseClientConn) SetIsWebsocket(isWebsocket bool) {
|
||||
this.isWebsocket = isWebsocket
|
||||
func (this *BaseClientConn) SetIsPersistent(isPersistent bool) {
|
||||
this.isPersistent = isPersistent
|
||||
}
|
||||
|
||||
// SetFingerprint 设置指纹信息
|
||||
func (this *BaseClientConn) SetFingerprint(fingerprint []byte) {
|
||||
this.fingerprint = fingerprint
|
||||
}
|
||||
|
||||
// Fingerprint 读取指纹信息
|
||||
func (this *BaseClientConn) Fingerprint() []byte {
|
||||
return this.fingerprint
|
||||
}
|
||||
|
||||
@@ -24,6 +24,12 @@ type ClientConnInterface interface {
|
||||
// UserId 获取当前连接所属服务的用户ID
|
||||
UserId() int64
|
||||
|
||||
// SetIsWebsocket 设置是否为Websocket
|
||||
SetIsWebsocket(isWebsocket bool)
|
||||
// SetIsPersistent 设置是否为持久化
|
||||
SetIsPersistent(isPersistent bool)
|
||||
|
||||
// SetFingerprint 设置指纹信息
|
||||
SetFingerprint(fingerprint []byte)
|
||||
|
||||
// Fingerprint 读取指纹信息
|
||||
Fingerprint() []byte
|
||||
}
|
||||
|
||||
@@ -15,6 +15,10 @@ import (
|
||||
|
||||
// 发送监控流量
|
||||
func init() {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
events.On(events.EventStart, func() {
|
||||
var ticker = time.NewTicker(1 * time.Minute)
|
||||
goman.New(func() {
|
||||
|
||||
@@ -56,15 +56,29 @@ func (this *ClientTLSConn) SetWriteDeadline(t time.Time) error {
|
||||
return this.rawConn.SetWriteDeadline(t)
|
||||
}
|
||||
|
||||
func (this *ClientTLSConn) SetIsWebsocket(isWebsocket bool) {
|
||||
func (this *ClientTLSConn) SetIsPersistent(isPersistent bool) {
|
||||
tlsConn, ok := this.rawConn.(*tls.Conn)
|
||||
if ok {
|
||||
var rawConn = tlsConn.NetConn()
|
||||
if rawConn != nil {
|
||||
clientConn, ok := rawConn.(*ClientConn)
|
||||
if ok {
|
||||
clientConn.SetIsWebsocket(isWebsocket)
|
||||
clientConn.SetIsPersistent(isPersistent)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (this *ClientTLSConn) Fingerprint() []byte {
|
||||
tlsConn, ok := this.rawConn.(*tls.Conn)
|
||||
if ok {
|
||||
var rawConn = tlsConn.NetConn()
|
||||
if rawConn != nil {
|
||||
clientConn, ok := rawConn.(*ClientConn)
|
||||
if ok {
|
||||
return clientConn.fingerprint
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -25,9 +25,12 @@ type HTTPAccessLogQueue struct {
|
||||
// NewHTTPAccessLogQueue 获取新对象
|
||||
func NewHTTPAccessLogQueue() *HTTPAccessLogQueue {
|
||||
// 队列中最大的值,超出此数量的访问日志会被丢弃
|
||||
// TODO 需要可以在界面中设置
|
||||
maxSize := 20000
|
||||
queue := &HTTPAccessLogQueue{
|
||||
var maxSize = 2_000 * (1 + utils.SystemMemoryGB()/2)
|
||||
if maxSize > 20_000 {
|
||||
maxSize = 20_000
|
||||
}
|
||||
|
||||
var queue = &HTTPAccessLogQueue{
|
||||
queue: make(chan *pb.HTTPAccessLog, maxSize),
|
||||
}
|
||||
goman.New(func() {
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/caches"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/compressions"
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
@@ -23,6 +24,10 @@ import (
|
||||
)
|
||||
|
||||
func init() {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
events.On(events.EventStart, func() {
|
||||
goman.New(func() {
|
||||
SharedHTTPCacheTaskManager.Start()
|
||||
|
||||
@@ -221,6 +221,18 @@ func (this *HTTPRequest) Do() {
|
||||
}
|
||||
}
|
||||
|
||||
// CC
|
||||
if !isHealthCheck {
|
||||
if this.web.CC != nil {
|
||||
if this.web.CC.IsOn {
|
||||
if this.doCC() {
|
||||
this.doEnd()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WAF
|
||||
if this.web.FirewallRef != nil && this.web.FirewallRef.IsOn {
|
||||
if this.doWAFRequest() {
|
||||
@@ -572,6 +584,11 @@ func (this *HTTPRequest) configureWeb(web *serverconfigs.HTTPWebConfig, isTop bo
|
||||
this.web.UAM = web.UAM
|
||||
}
|
||||
|
||||
// CC
|
||||
if web.CC != nil && (web.CC.IsPrior || isTop) {
|
||||
this.web.CC = web.CC
|
||||
}
|
||||
|
||||
// 重写规则
|
||||
if len(web.RewriteRefs) > 0 {
|
||||
for index, ref := range web.RewriteRefs {
|
||||
@@ -728,6 +745,8 @@ func (this *HTTPRequest) Format(source string) string {
|
||||
return this.Path()
|
||||
case "requestPathExtension":
|
||||
return filepath.Ext(this.Path())
|
||||
case "requestPathLowerExtension":
|
||||
return strings.ToLower(filepath.Ext(this.Path()))
|
||||
case "requestLength":
|
||||
return strconv.FormatInt(this.requestLength(), 10)
|
||||
case "requestTime":
|
||||
@@ -841,7 +860,7 @@ func (this *HTTPRequest) Format(source string) string {
|
||||
}
|
||||
|
||||
// response.xxx.xxx
|
||||
dotIndex := strings.Index(suffix, ".")
|
||||
dotIndex = strings.Index(suffix, ".")
|
||||
if dotIndex < 0 {
|
||||
return "${" + varName + "}"
|
||||
}
|
||||
|
||||
8
internal/nodes/http_request_cc.go
Normal file
8
internal/nodes/http_request_cc.go
Normal file
@@ -0,0 +1,8 @@
|
||||
// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||
//go:build !plus
|
||||
|
||||
package nodes
|
||||
|
||||
func (this *HTTPRequest) doCC() (block bool) {
|
||||
return
|
||||
}
|
||||
@@ -73,6 +73,16 @@ func (this *HTTPRequest) doFastcgi() (shouldStop bool) {
|
||||
}
|
||||
}
|
||||
|
||||
// 设置为持久化连接
|
||||
var requestConn = this.RawReq.Context().Value(HTTPConnContextKey)
|
||||
if requestConn == nil {
|
||||
return
|
||||
}
|
||||
requestClientConn, ok := requestConn.(ClientConnInterface)
|
||||
if ok {
|
||||
requestClientConn.SetIsPersistent(true)
|
||||
}
|
||||
|
||||
// 连接池配置
|
||||
poolSize := fastcgi.PoolSize
|
||||
if poolSize <= 0 {
|
||||
|
||||
@@ -67,7 +67,7 @@ func (this *HTTPRequest) doWAFRequest() (blocked bool) {
|
||||
|
||||
// 当前服务的独立设置
|
||||
if this.web.FirewallPolicy != nil && this.web.FirewallPolicy.IsOn {
|
||||
blocked, breakChecking := this.checkWAFRequest(this.web.FirewallPolicy, forceLog, forceLogRequestBody, forceLogRegionDenying)
|
||||
blocked, breakChecking := this.checkWAFRequest(this.web.FirewallPolicy, forceLog, forceLogRequestBody, forceLogRegionDenying, false)
|
||||
if blocked {
|
||||
return true
|
||||
}
|
||||
@@ -78,7 +78,7 @@ func (this *HTTPRequest) doWAFRequest() (blocked bool) {
|
||||
|
||||
// 公用的防火墙设置
|
||||
if this.ReqServer.HTTPFirewallPolicy != nil && this.ReqServer.HTTPFirewallPolicy.IsOn {
|
||||
blocked, breakChecking := this.checkWAFRequest(this.ReqServer.HTTPFirewallPolicy, forceLog, forceLogRequestBody, forceLogRegionDenying)
|
||||
blocked, breakChecking := this.checkWAFRequest(this.ReqServer.HTTPFirewallPolicy, forceLog, forceLogRequestBody, forceLogRegionDenying, this.web.FirewallRef.IgnoreGlobalRules)
|
||||
if blocked {
|
||||
return true
|
||||
}
|
||||
@@ -90,7 +90,7 @@ func (this *HTTPRequest) doWAFRequest() (blocked bool) {
|
||||
return
|
||||
}
|
||||
|
||||
func (this *HTTPRequest) checkWAFRequest(firewallPolicy *firewallconfigs.HTTPFirewallPolicy, forceLog bool, logRequestBody bool, logDenying bool) (blocked bool, breakChecking bool) {
|
||||
func (this *HTTPRequest) checkWAFRequest(firewallPolicy *firewallconfigs.HTTPFirewallPolicy, forceLog bool, logRequestBody bool, logDenying bool, ignoreRules bool) (blocked bool, breakChecking bool) {
|
||||
// 检查配置是否为空
|
||||
if firewallPolicy == nil || !firewallPolicy.IsOn || firewallPolicy.Inbound == nil || !firewallPolicy.Inbound.IsOn || firewallPolicy.Mode == firewallconfigs.FirewallModeBypass {
|
||||
return
|
||||
@@ -211,8 +211,13 @@ func (this *HTTPRequest) checkWAFRequest(firewallPolicy *firewallconfigs.HTTPFir
|
||||
}
|
||||
}
|
||||
|
||||
// 是否执行规则
|
||||
if ignoreRules {
|
||||
return
|
||||
}
|
||||
|
||||
// 规则测试
|
||||
w := waf.SharedWAFManager.FindWAF(firewallPolicy.Id)
|
||||
var w = waf.SharedWAFManager.FindWAF(firewallPolicy.Id)
|
||||
if w == nil {
|
||||
return
|
||||
}
|
||||
@@ -267,7 +272,7 @@ func (this *HTTPRequest) doWAFResponse(resp *http.Response) (blocked bool) {
|
||||
}
|
||||
|
||||
if this.web.FirewallPolicy != nil && this.web.FirewallPolicy.IsOn {
|
||||
blocked := this.checkWAFResponse(this.web.FirewallPolicy, resp, forceLog, forceLogRequestBody)
|
||||
blocked := this.checkWAFResponse(this.web.FirewallPolicy, resp, forceLog, forceLogRequestBody, false)
|
||||
if blocked {
|
||||
return true
|
||||
}
|
||||
@@ -275,7 +280,7 @@ func (this *HTTPRequest) doWAFResponse(resp *http.Response) (blocked bool) {
|
||||
|
||||
// 公用的防火墙设置
|
||||
if this.ReqServer.HTTPFirewallPolicy != nil && this.ReqServer.HTTPFirewallPolicy.IsOn {
|
||||
blocked := this.checkWAFResponse(this.ReqServer.HTTPFirewallPolicy, resp, forceLog, forceLogRequestBody)
|
||||
blocked := this.checkWAFResponse(this.ReqServer.HTTPFirewallPolicy, resp, forceLog, forceLogRequestBody, this.web.FirewallRef.IgnoreGlobalRules)
|
||||
if blocked {
|
||||
return true
|
||||
}
|
||||
@@ -283,12 +288,17 @@ func (this *HTTPRequest) doWAFResponse(resp *http.Response) (blocked bool) {
|
||||
return
|
||||
}
|
||||
|
||||
func (this *HTTPRequest) checkWAFResponse(firewallPolicy *firewallconfigs.HTTPFirewallPolicy, resp *http.Response, forceLog bool, logRequestBody bool) (blocked bool) {
|
||||
func (this *HTTPRequest) checkWAFResponse(firewallPolicy *firewallconfigs.HTTPFirewallPolicy, resp *http.Response, forceLog bool, logRequestBody bool, ignoreRules bool) (blocked bool) {
|
||||
if firewallPolicy == nil || !firewallPolicy.IsOn || !firewallPolicy.Outbound.IsOn || firewallPolicy.Mode == firewallconfigs.FirewallModeBypass {
|
||||
return
|
||||
}
|
||||
|
||||
w := waf.SharedWAFManager.FindWAF(firewallPolicy.Id)
|
||||
// 是否执行规则
|
||||
if ignoreRules {
|
||||
return
|
||||
}
|
||||
|
||||
var w = waf.SharedWAFManager.FindWAF(firewallPolicy.Id)
|
||||
if w == nil {
|
||||
return
|
||||
}
|
||||
@@ -392,3 +402,22 @@ func (this *HTTPRequest) WAFOnAction(action interface{}) (goNext bool) {
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (this *HTTPRequest) WAFFingerprint() []byte {
|
||||
// 目前只有HTTPS请求才有指纹
|
||||
if !this.IsHTTPS {
|
||||
return nil
|
||||
}
|
||||
|
||||
var requestConn = this.RawReq.Context().Value(HTTPConnContextKey)
|
||||
if requestConn == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
clientConn, ok := requestConn.(ClientConnInterface)
|
||||
if ok {
|
||||
return clientConn.Fingerprint()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -111,7 +111,7 @@ func (this *HTTPRequest) doWebsocket(requestHost string, isLastRetry bool) (shou
|
||||
|
||||
requestClientConn, ok := requestConn.(ClientConnInterface)
|
||||
if ok {
|
||||
requestClientConn.SetIsWebsocket(true)
|
||||
requestClientConn.SetIsPersistent(true)
|
||||
}
|
||||
|
||||
clientConn, _, err := this.writer.Hijack()
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/caches"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/compressions"
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils/readers"
|
||||
@@ -39,6 +40,10 @@ var webpMaxBufferSize int64 = 1_000_000_000
|
||||
var webpTotalBufferSize int64 = 0
|
||||
|
||||
func init() {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
var systemMemory = utils.SystemMemoryGB() / 8
|
||||
if systemMemory > 0 {
|
||||
webpMaxBufferSize = int64(systemMemory) * 1024 * 1024 * 1024
|
||||
@@ -99,6 +104,18 @@ func NewHTTPWriter(req *HTTPRequest, httpResponseWriter http.ResponseWriter) *HT
|
||||
|
||||
// Prepare 准备输出
|
||||
func (this *HTTPWriter) Prepare(resp *http.Response, size int64, status int, enableCache bool) (delayHeaders bool) {
|
||||
// 清理以前数据,防止重试时发生异常错误
|
||||
if this.compressionCacheWriter != nil {
|
||||
_ = this.compressionCacheWriter.Discard()
|
||||
this.compressionCacheWriter = nil
|
||||
}
|
||||
|
||||
if this.cacheWriter != nil {
|
||||
_ = this.cacheWriter.Discard()
|
||||
this.cacheWriter = nil
|
||||
}
|
||||
|
||||
// 新的请求相关数据
|
||||
this.size = size
|
||||
this.statusCode = status
|
||||
|
||||
@@ -315,6 +332,12 @@ func (this *HTTPWriter) PrepareCache(resp *http.Response, size int64) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 先清理以前的
|
||||
if this.cacheWriter != nil {
|
||||
_ = this.cacheWriter.Discard()
|
||||
}
|
||||
|
||||
cacheWriter, err := storage.OpenWriter(cacheKey, expiresAt, this.StatusCode(), this.calculateHeaderLength(), totalSize, cacheRef.MaxSizeBytes(), this.isPartial)
|
||||
if err != nil {
|
||||
if err == caches.ErrEntityTooLarge && addStatusHeader {
|
||||
@@ -691,6 +714,9 @@ func (this *HTTPWriter) PrepareCompression(resp *http.Response, size int64) {
|
||||
}
|
||||
|
||||
if compressionCacheWriter != nil {
|
||||
if this.compressionCacheWriter != nil {
|
||||
_ = this.compressionCacheWriter.Close()
|
||||
}
|
||||
this.compressionCacheWriter = compressionCacheWriter
|
||||
var teeWriter = writers.NewTeeWriterCloser(this.writer, compressionCacheWriter)
|
||||
teeWriter.OnFail(func(err error) {
|
||||
|
||||
@@ -36,6 +36,15 @@ func (this *BaseListener) buildTLSConfig() *tls.Config {
|
||||
return &tls.Config{
|
||||
Certificates: nil,
|
||||
GetConfigForClient: func(clientInfo *tls.ClientHelloInfo) (config *tls.Config, e error) {
|
||||
// 指纹信息
|
||||
var fingerprint = this.calculateFingerprint(clientInfo)
|
||||
if len(fingerprint) > 0 {
|
||||
clientConn, ok := clientInfo.Conn.(ClientConnInterface)
|
||||
if ok {
|
||||
clientConn.SetFingerprint(fingerprint)
|
||||
}
|
||||
}
|
||||
|
||||
tlsPolicy, _, err := this.matchSSL(this.helloServerName(clientInfo))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -50,6 +59,15 @@ func (this *BaseListener) buildTLSConfig() *tls.Config {
|
||||
return tlsPolicy.TLSConfig(), nil
|
||||
},
|
||||
GetCertificate: func(clientInfo *tls.ClientHelloInfo) (certificate *tls.Certificate, e error) {
|
||||
// 指纹信息
|
||||
var fingerprint = this.calculateFingerprint(clientInfo)
|
||||
if len(fingerprint) > 0 {
|
||||
clientConn, ok := clientInfo.Conn.(ClientConnInterface)
|
||||
if ok {
|
||||
clientConn.SetFingerprint(fingerprint)
|
||||
}
|
||||
}
|
||||
|
||||
tlsPolicy, cert, err := this.matchSSL(this.helloServerName(clientInfo))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
10
internal/nodes/listener_base_ext.go
Normal file
10
internal/nodes/listener_base_ext.go
Normal file
@@ -0,0 +1,10 @@
|
||||
// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||
//go:build !plus
|
||||
|
||||
package nodes
|
||||
|
||||
import "crypto/tls"
|
||||
|
||||
func (this *BaseListener) calculateFingerprint(clientInfo *tls.ClientHelloInfo) []byte {
|
||||
return nil
|
||||
}
|
||||
@@ -226,14 +226,21 @@ func (this *HTTPListener) emptyServer() *serverconfigs.ServerConfig {
|
||||
Type: serverconfigs.ServerTypeHTTPProxy,
|
||||
}
|
||||
|
||||
var accessLogRef = serverconfigs.NewHTTPAccessLogRef()
|
||||
// TODO 需要配置是否记录日志
|
||||
accessLogRef.IsOn = true
|
||||
accessLogRef.Fields = append([]int{}, serverconfigs.HTTPAccessLogDefaultFieldsCodes...)
|
||||
server.Web = &serverconfigs.HTTPWebConfig{
|
||||
IsOn: true,
|
||||
AccessLogRef: accessLogRef,
|
||||
// 检查是否开启访问日志
|
||||
if sharedNodeConfig != nil {
|
||||
var globalServerConfig = sharedNodeConfig.GlobalServerConfig
|
||||
if globalServerConfig != nil && globalServerConfig.HTTPAccessLog.EnableServerNotFound {
|
||||
var accessLogRef = serverconfigs.NewHTTPAccessLogRef()
|
||||
accessLogRef.IsOn = true
|
||||
accessLogRef.Fields = append([]int{}, serverconfigs.HTTPAccessLogDefaultFieldsCodes...)
|
||||
server.Web = &serverconfigs.HTTPWebConfig{
|
||||
IsOn: true,
|
||||
AccessLogRef: accessLogRef,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO 需要对访问频率过多的IP进行惩罚
|
||||
|
||||
return server
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/firewalls"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
@@ -23,7 +24,15 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
var sharedListenerManager = NewListenerManager()
|
||||
var sharedListenerManager *ListenerManager
|
||||
|
||||
func init() {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
sharedListenerManager = NewListenerManager()
|
||||
}
|
||||
|
||||
// ListenerManager 端口监听管理器
|
||||
type ListenerManager struct {
|
||||
|
||||
@@ -404,7 +404,7 @@ func NewUDPConn(server *serverconfigs.ServerConfig, addr net.Addr, proxyListener
|
||||
stats.SharedTrafficStatManager.Add(server.Id, "", int64(n), 0, 0, 0, 0, 0, server.ShouldCheckTrafficLimit(), server.PlanId())
|
||||
|
||||
// 带宽
|
||||
stats.SharedBandwidthStatManager.Add(server.UserId, server.Id, int64(n))
|
||||
stats.SharedBandwidthStatManager.Add(server.UserId, server.Id, int64(n), int64(n))
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
|
||||
@@ -236,8 +236,6 @@ func (this *Node) Start() {
|
||||
|
||||
// Daemon 实现守护进程
|
||||
func (this *Node) Daemon() {
|
||||
teaconst.IsDaemon = true
|
||||
|
||||
var isDebug = lists.ContainsString(os.Args, "debug")
|
||||
for {
|
||||
conn, err := this.sock.Dial()
|
||||
|
||||
@@ -8,6 +8,8 @@ import (
|
||||
"github.com/iwind/TeaGo/maps"
|
||||
"github.com/shirou/gopsutil/v3/load"
|
||||
"github.com/shirou/gopsutil/v3/mem"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
)
|
||||
|
||||
// 更新内存
|
||||
@@ -31,6 +33,18 @@ func (this *NodeStatusExecutor) updateMem(status *nodeconfigs.NodeStatus) {
|
||||
"total": status.MemoryTotal,
|
||||
"used": stat.Used,
|
||||
})
|
||||
|
||||
// 内存严重不足时自动释放内存
|
||||
if stat.Total > 0 {
|
||||
var minFreeMemory = stat.Total / 8
|
||||
if minFreeMemory > 1<<30 {
|
||||
minFreeMemory = 1 << 30
|
||||
}
|
||||
if stat.Free < minFreeMemory {
|
||||
runtime.GC()
|
||||
debug.FreeOSMemory()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 更新负载
|
||||
|
||||
@@ -4,6 +4,7 @@ package nodes
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
@@ -16,6 +17,10 @@ import (
|
||||
var SharedOriginStateManager = NewOriginStateManager()
|
||||
|
||||
func init() {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
events.On(events.EventLoaded, func() {
|
||||
goman.New(func() {
|
||||
SharedOriginStateManager.Start()
|
||||
|
||||
@@ -19,6 +19,10 @@ import (
|
||||
)
|
||||
|
||||
func init() {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
var manager = NewSystemServiceManager()
|
||||
events.On(events.EventReload, func() {
|
||||
goman.New(func() {
|
||||
|
||||
@@ -4,6 +4,7 @@ package nodes
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
@@ -15,6 +16,10 @@ import (
|
||||
var sharedOCSPTask = NewOCSPUpdateTask()
|
||||
|
||||
func init() {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
events.On(events.EventLoaded, func() {
|
||||
sharedOCSPTask.version = sharedNodeConfig.OCSPVersion
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package nodes
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/configs"
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/rpc"
|
||||
@@ -16,6 +17,10 @@ import (
|
||||
var sharedSyncAPINodesTask = NewSyncAPINodesTask()
|
||||
|
||||
func init() {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
events.On(events.EventStart, func() {
|
||||
goman.New(func() {
|
||||
sharedSyncAPINodesTask.Start()
|
||||
|
||||
@@ -3,6 +3,7 @@ package nodes
|
||||
import (
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
@@ -17,6 +18,10 @@ import (
|
||||
var sharedTOAManager = NewTOAManager()
|
||||
|
||||
func init() {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
events.On(events.EventReload, func() {
|
||||
err := sharedTOAManager.Run(sharedNodeConfig.TOA)
|
||||
if err != nil {
|
||||
|
||||
@@ -20,6 +20,10 @@ import (
|
||||
var logChan = make(chan *pb.NodeLog, 64) // 队列数量不需要太长,因为日志通常仅仅为调试用
|
||||
|
||||
func init() {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
// 定期上传日志
|
||||
var ticker = time.NewTicker(60 * time.Second)
|
||||
if Tea.IsTesting() {
|
||||
|
||||
@@ -5,6 +5,7 @@ package stats
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
@@ -21,6 +22,10 @@ var SharedBandwidthStatManager = NewBandwidthStatManager()
|
||||
const bandwidthTimestampDelim = 2 // N秒平均,更为精确
|
||||
|
||||
func init() {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
events.On(events.EventLoaded, func() {
|
||||
goman.New(func() {
|
||||
SharedBandwidthStatManager.Start()
|
||||
@@ -37,6 +42,7 @@ type BandwidthStat struct {
|
||||
CurrentBytes int64
|
||||
CurrentTimestamp int64
|
||||
MaxBytes int64
|
||||
TotalBytes int64
|
||||
}
|
||||
|
||||
// BandwidthStatManager 服务带宽统计
|
||||
@@ -107,6 +113,7 @@ func (this *BandwidthStatManager) Loop() error {
|
||||
Day: stat.Day,
|
||||
TimeAt: stat.TimeAt,
|
||||
Bytes: stat.MaxBytes / bandwidthTimestampDelim,
|
||||
TotalBytes: stat.TotalBytes,
|
||||
NodeRegionId: regionId,
|
||||
})
|
||||
delete(this.m, key)
|
||||
@@ -132,8 +139,8 @@ func (this *BandwidthStatManager) Loop() error {
|
||||
}
|
||||
|
||||
// Add 添加带宽数据
|
||||
func (this *BandwidthStatManager) Add(userId int64, serverId int64, bytes int64) {
|
||||
if serverId <= 0 || bytes == 0 {
|
||||
func (this *BandwidthStatManager) Add(userId int64, serverId int64, peekBytes int64, totalBytes int64) {
|
||||
if serverId <= 0 || (peekBytes == 0 && totalBytes == 0) {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -146,8 +153,8 @@ func (this *BandwidthStatManager) Add(userId int64, serverId int64, bytes int64)
|
||||
// 增加TCP Header尺寸,这里默认MTU为1500,且默认为IPv4
|
||||
const mtu = 1500
|
||||
const tcpHeaderSize = 20
|
||||
if bytes > mtu {
|
||||
bytes += bytes * tcpHeaderSize / mtu
|
||||
if peekBytes > mtu {
|
||||
peekBytes += peekBytes * tcpHeaderSize / mtu
|
||||
}
|
||||
|
||||
this.locker.Lock()
|
||||
@@ -156,22 +163,25 @@ func (this *BandwidthStatManager) Add(userId int64, serverId int64, bytes int64)
|
||||
// 此刻如果发生用户ID(userId)的变化也忽略,等N分钟后有新记录后再换
|
||||
|
||||
if stat.CurrentTimestamp == timestamp {
|
||||
stat.CurrentBytes += bytes
|
||||
stat.CurrentBytes += peekBytes
|
||||
} else {
|
||||
stat.CurrentBytes = bytes
|
||||
stat.CurrentBytes = peekBytes
|
||||
stat.CurrentTimestamp = timestamp
|
||||
}
|
||||
if stat.CurrentBytes > stat.MaxBytes {
|
||||
stat.MaxBytes = stat.CurrentBytes
|
||||
}
|
||||
|
||||
stat.TotalBytes += totalBytes
|
||||
} else {
|
||||
this.m[key] = &BandwidthStat{
|
||||
Day: day,
|
||||
TimeAt: timeAt,
|
||||
UserId: userId,
|
||||
ServerId: serverId,
|
||||
CurrentBytes: bytes,
|
||||
MaxBytes: bytes,
|
||||
CurrentBytes: peekBytes,
|
||||
MaxBytes: peekBytes,
|
||||
TotalBytes: totalBytes,
|
||||
CurrentTimestamp: timestamp,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,22 +10,22 @@ import (
|
||||
|
||||
func TestBandwidthStatManager_Add(t *testing.T) {
|
||||
var manager = stats.NewBandwidthStatManager()
|
||||
manager.Add(1, 1, 10)
|
||||
manager.Add(1, 1, 10)
|
||||
manager.Add(1, 1, 10)
|
||||
manager.Add(1, 1, 10, 10)
|
||||
manager.Add(1, 1, 10, 10)
|
||||
manager.Add(1, 1, 10, 10)
|
||||
time.Sleep(1 * time.Second)
|
||||
manager.Add(1, 1, 85)
|
||||
manager.Add(1, 1, 85, 85)
|
||||
time.Sleep(1 * time.Second)
|
||||
manager.Add(1, 1, 25)
|
||||
manager.Add(1, 1, 75)
|
||||
manager.Add(1, 1, 25, 25)
|
||||
manager.Add(1, 1, 75, 75)
|
||||
manager.Inspect()
|
||||
}
|
||||
|
||||
func TestBandwidthStatManager_Loop(t *testing.T) {
|
||||
var manager = stats.NewBandwidthStatManager()
|
||||
manager.Add(1, 1, 10)
|
||||
manager.Add(1, 1, 10)
|
||||
manager.Add(1, 1, 10)
|
||||
manager.Add(1, 1, 10, 10)
|
||||
manager.Add(1, 1, 10, 10)
|
||||
manager.Add(1, 1, 10, 10)
|
||||
err := manager.Loop()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
||||
@@ -17,8 +17,10 @@ import (
|
||||
"github.com/iwind/TeaGo/maps"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
timeutil "github.com/iwind/TeaGo/utils/time"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -45,7 +47,13 @@ type HTTPRequestStatManager struct {
|
||||
|
||||
dailyFirewallRuleGroupMap map[string]int64 // serverId@firewallRuleGroupId@action => count
|
||||
|
||||
serverCityCountMap map[string]int16 // serverIdString => count cities
|
||||
serverSystemCountMap map[string]int16 // serverIdString => count systems
|
||||
serverBrowserCountMap map[string]int16 // serverIdString => count browsers
|
||||
|
||||
totalAttackRequests int64
|
||||
|
||||
locker sync.Mutex
|
||||
}
|
||||
|
||||
// NewHTTPRequestStatManager 获取新对象
|
||||
@@ -59,6 +67,10 @@ func NewHTTPRequestStatManager() *HTTPRequestStatManager {
|
||||
systemMap: map[string]int64{},
|
||||
browserMap: map[string]int64{},
|
||||
dailyFirewallRuleGroupMap: map[string]int64{},
|
||||
|
||||
serverCityCountMap: map[string]int16{},
|
||||
serverSystemCountMap: map[string]int16{},
|
||||
serverBrowserCountMap: map[string]int16{},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -78,7 +90,6 @@ func (this *HTTPRequestStatManager) Start() {
|
||||
}
|
||||
})
|
||||
|
||||
var loopTicker = time.NewTicker(1 * time.Second)
|
||||
var uploadTicker = time.NewTicker(30 * time.Minute)
|
||||
if Tea.IsTesting() {
|
||||
uploadTicker = time.NewTicker(10 * time.Second) // 在测试环境下缩短Ticker时间,以方便我们调试
|
||||
@@ -86,20 +97,12 @@ func (this *HTTPRequestStatManager) Start() {
|
||||
remotelogs.Println("HTTP_REQUEST_STAT_MANAGER", "start ...")
|
||||
events.OnKey(events.EventQuit, this, func() {
|
||||
remotelogs.Println("HTTP_REQUEST_STAT_MANAGER", "quit")
|
||||
loopTicker.Stop()
|
||||
uploadTicker.Stop()
|
||||
})
|
||||
for range loopTicker.C {
|
||||
err := this.Loop()
|
||||
if err != nil {
|
||||
if rpc.IsConnError(err) {
|
||||
remotelogs.Warn("HTTP_REQUEST_STAT_MANAGER", err.Error())
|
||||
} else {
|
||||
remotelogs.Error("HTTP_REQUEST_STAT_MANAGER", err.Error())
|
||||
}
|
||||
}
|
||||
select {
|
||||
case <-uploadTicker.C:
|
||||
|
||||
// 上传Ticker
|
||||
goman.New(func() {
|
||||
for range uploadTicker.C {
|
||||
var tr = trackers.Begin("UPLOAD_REQUEST_STATS")
|
||||
err := this.Upload()
|
||||
tr.End()
|
||||
@@ -110,9 +113,20 @@ func (this *HTTPRequestStatManager) Start() {
|
||||
remotelogs.Warn("HTTP_REQUEST_STAT_MANAGER", "upload failed: "+err.Error())
|
||||
}
|
||||
}
|
||||
default:
|
||||
|
||||
}
|
||||
})
|
||||
|
||||
// 分析Ticker
|
||||
for {
|
||||
err := this.Loop()
|
||||
if err != nil {
|
||||
if rpc.IsConnError(err) {
|
||||
remotelogs.Warn("HTTP_REQUEST_STAT_MANAGER", err.Error())
|
||||
} else {
|
||||
remotelogs.Error("HTTP_REQUEST_STAT_MANAGER", err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -148,7 +162,7 @@ func (this *HTTPRequestStatManager) AddRemoteAddr(serverId int64, remoteAddr str
|
||||
|
||||
// AddUserAgent 添加UserAgent
|
||||
func (this *HTTPRequestStatManager) AddUserAgent(serverId int64, userAgent string, ip string) {
|
||||
if len(userAgent) == 0 {
|
||||
if len(userAgent) == 0 || strings.ContainsRune(userAgent, '@') /** 非常重要,防止后面组合字符串时出现异常 **/ {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -183,75 +197,101 @@ func (this *HTTPRequestStatManager) AddFirewallRuleGroupId(serverId int64, firew
|
||||
|
||||
// Loop 单个循环
|
||||
func (this *HTTPRequestStatManager) Loop() error {
|
||||
var timeout = time.NewTimer(10 * time.Minute) // 执行的最大时间
|
||||
Loop:
|
||||
for {
|
||||
select {
|
||||
case ipString := <-this.ipChan:
|
||||
// serverId@ip@bytes@isAttack
|
||||
var pieces = strings.Split(ipString, "@")
|
||||
if len(pieces) < 4 {
|
||||
continue
|
||||
}
|
||||
var serverId = pieces[0]
|
||||
var ip = pieces[1]
|
||||
|
||||
var result = iplib.LookupIP(ip)
|
||||
if result != nil && result.IsOk() {
|
||||
var key = serverId + "@" + result.CountryName() + "@" + result.ProvinceName() + "@" + result.CityName()
|
||||
stat, ok := this.cityMap[key]
|
||||
if !ok {
|
||||
stat = &StatItem{}
|
||||
this.cityMap[key] = stat
|
||||
}
|
||||
stat.Bytes += types.Int64(pieces[2])
|
||||
stat.CountRequests++
|
||||
if types.Int8(pieces[3]) == 1 {
|
||||
stat.AttackBytes += types.Int64(pieces[2])
|
||||
stat.CountAttackRequests++
|
||||
}
|
||||
|
||||
if len(result.ProviderName()) > 0 {
|
||||
this.providerMap[serverId+"@"+result.ProviderName()]++
|
||||
}
|
||||
}
|
||||
|
||||
case userAgentString := <-this.userAgentChan:
|
||||
var atIndex = strings.Index(userAgentString, "@")
|
||||
if atIndex < 0 {
|
||||
continue
|
||||
}
|
||||
var serverId = userAgentString[:atIndex]
|
||||
var userAgent = userAgentString[atIndex+1:]
|
||||
|
||||
var result = SharedUserAgentParser.Parse(userAgent)
|
||||
var osInfo = result.OS
|
||||
if len(osInfo.Name) > 0 {
|
||||
dotIndex := strings.Index(osInfo.Version, ".")
|
||||
if dotIndex > -1 {
|
||||
osInfo.Version = osInfo.Version[:dotIndex]
|
||||
}
|
||||
this.systemMap[serverId+"@"+osInfo.Name+"@"+osInfo.Version]++
|
||||
}
|
||||
|
||||
var browser, browserVersion = result.BrowserName, result.BrowserVersion
|
||||
if len(browser) > 0 {
|
||||
dotIndex := strings.Index(browserVersion, ".")
|
||||
if dotIndex > -1 {
|
||||
browserVersion = browserVersion[:dotIndex]
|
||||
}
|
||||
this.browserMap[serverId+"@"+browser+"@"+browserVersion]++
|
||||
}
|
||||
case firewallRuleGroupString := <-this.firewallRuleGroupChan:
|
||||
this.dailyFirewallRuleGroupMap[firewallRuleGroupString]++
|
||||
case <-timeout.C:
|
||||
break Loop
|
||||
default:
|
||||
break Loop
|
||||
select {
|
||||
case ipString := <-this.ipChan:
|
||||
// serverId@ip@bytes@isAttack
|
||||
var pieces = strings.Split(ipString, "@")
|
||||
if len(pieces) < 4 {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
var serverId = pieces[0]
|
||||
var ip = pieces[1]
|
||||
|
||||
timeout.Stop()
|
||||
var result = iplib.LookupIP(ip)
|
||||
if result != nil && result.IsOk() {
|
||||
var key = serverId + "@" + types.String(result.CountryId()) + "@" + types.String(result.ProvinceId()) + "@" + types.String(result.CityId())
|
||||
this.locker.Lock()
|
||||
stat, ok := this.cityMap[key]
|
||||
if !ok {
|
||||
// 检查数量
|
||||
if this.serverCityCountMap[key] > 128 { // 限制单个服务的城市数量,防止数量过多
|
||||
this.locker.Unlock()
|
||||
return nil
|
||||
}
|
||||
this.serverCityCountMap[key]++ // 需要放在限制之后,因为使用的是int16
|
||||
|
||||
stat = &StatItem{}
|
||||
this.cityMap[key] = stat
|
||||
}
|
||||
stat.Bytes += types.Int64(pieces[2])
|
||||
stat.CountRequests++
|
||||
if types.Int8(pieces[3]) == 1 {
|
||||
stat.AttackBytes += types.Int64(pieces[2])
|
||||
stat.CountAttackRequests++
|
||||
}
|
||||
|
||||
if result.ProviderId() > 0 {
|
||||
this.providerMap[serverId+"@"+types.String(result.ProviderId())]++
|
||||
}
|
||||
this.locker.Unlock()
|
||||
}
|
||||
case userAgentString := <-this.userAgentChan:
|
||||
var atIndex = strings.Index(userAgentString, "@")
|
||||
if atIndex < 0 {
|
||||
return nil
|
||||
}
|
||||
var serverId = userAgentString[:atIndex]
|
||||
var userAgent = userAgentString[atIndex+1:]
|
||||
|
||||
var result = SharedUserAgentParser.Parse(userAgent)
|
||||
var osInfo = result.OS
|
||||
if len(osInfo.Name) > 0 {
|
||||
dotIndex := strings.Index(osInfo.Version, ".")
|
||||
if dotIndex > -1 {
|
||||
osInfo.Version = osInfo.Version[:dotIndex]
|
||||
}
|
||||
this.locker.Lock()
|
||||
|
||||
var systemKey = serverId + "@" + osInfo.Name + "@" + osInfo.Version
|
||||
_, ok := this.systemMap[systemKey]
|
||||
if !ok {
|
||||
if this.serverSystemCountMap[serverId] < 128 { // 限制最大数据,防止攻击
|
||||
this.serverSystemCountMap[serverId]++
|
||||
ok = true
|
||||
}
|
||||
}
|
||||
if ok {
|
||||
this.systemMap[systemKey]++
|
||||
}
|
||||
this.locker.Unlock()
|
||||
}
|
||||
|
||||
var browser, browserVersion = result.BrowserName, result.BrowserVersion
|
||||
if len(browser) > 0 {
|
||||
dotIndex := strings.Index(browserVersion, ".")
|
||||
if dotIndex > -1 {
|
||||
browserVersion = browserVersion[:dotIndex]
|
||||
}
|
||||
this.locker.Lock()
|
||||
|
||||
var browserKey = serverId + "@" + browser + "@" + browserVersion
|
||||
_, ok := this.browserMap[browserKey]
|
||||
if !ok {
|
||||
if this.serverBrowserCountMap[serverId] < 256 { // 限制最大数据,防止攻击
|
||||
this.serverBrowserCountMap[serverId]++
|
||||
ok = true
|
||||
}
|
||||
}
|
||||
if ok {
|
||||
this.browserMap[browserKey]++
|
||||
}
|
||||
this.locker.Unlock()
|
||||
}
|
||||
case firewallRuleGroupString := <-this.firewallRuleGroupChan:
|
||||
this.locker.Lock()
|
||||
this.dailyFirewallRuleGroupMap[firewallRuleGroupString]++
|
||||
this.locker.Unlock()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -264,54 +304,178 @@ func (this *HTTPRequestStatManager) Upload() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// 月份相关
|
||||
// 拷贝数据
|
||||
this.locker.Lock()
|
||||
var cityMap = this.cityMap
|
||||
var providerMap = this.providerMap
|
||||
var systemMap = this.systemMap
|
||||
var browserMap = this.browserMap
|
||||
var dailyFirewallRuleGroupMap = this.dailyFirewallRuleGroupMap
|
||||
|
||||
this.cityMap = map[string]*StatItem{}
|
||||
this.providerMap = map[string]int64{}
|
||||
this.systemMap = map[string]int64{}
|
||||
this.browserMap = map[string]int64{}
|
||||
this.dailyFirewallRuleGroupMap = map[string]int64{}
|
||||
|
||||
this.serverCityCountMap = map[string]int16{}
|
||||
this.serverSystemCountMap = map[string]int16{}
|
||||
this.serverBrowserCountMap = map[string]int16{}
|
||||
|
||||
this.locker.Unlock()
|
||||
|
||||
// 上传限制
|
||||
var maxCities int16 = 32
|
||||
var maxProviders int16 = 32
|
||||
var maxSystems int16 = 64
|
||||
var maxBrowsers int16 = 64
|
||||
nodeConfig, _ := nodeconfigs.SharedNodeConfig()
|
||||
if nodeConfig != nil {
|
||||
var serverConfig = nodeConfig.GlobalServerConfig // 复制是为了防止在中途修改
|
||||
if serverConfig != nil {
|
||||
var uploadConfig = serverConfig.Stat.Upload
|
||||
if uploadConfig.MaxCities > 0 {
|
||||
maxCities = uploadConfig.MaxCities
|
||||
}
|
||||
if uploadConfig.MaxProviders > 0 {
|
||||
maxProviders = uploadConfig.MaxProviders
|
||||
}
|
||||
if uploadConfig.MaxSystems > 0 {
|
||||
maxSystems = uploadConfig.MaxSystems
|
||||
}
|
||||
if uploadConfig.MaxBrowsers > 0 {
|
||||
maxBrowsers = uploadConfig.MaxBrowsers
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var pbCities = []*pb.UploadServerHTTPRequestStatRequest_RegionCity{}
|
||||
var pbProviders = []*pb.UploadServerHTTPRequestStatRequest_RegionProvider{}
|
||||
var pbSystems = []*pb.UploadServerHTTPRequestStatRequest_System{}
|
||||
var pbBrowsers = []*pb.UploadServerHTTPRequestStatRequest_Browser{}
|
||||
for k, stat := range this.cityMap {
|
||||
|
||||
// 城市
|
||||
for k, stat := range cityMap {
|
||||
var pieces = strings.SplitN(k, "@", 4)
|
||||
var serverId = types.Int64(pieces[0])
|
||||
pbCities = append(pbCities, &pb.UploadServerHTTPRequestStatRequest_RegionCity{
|
||||
ServerId: types.Int64(pieces[0]),
|
||||
CountryName: pieces[1],
|
||||
ProvinceName: pieces[2],
|
||||
CityName: pieces[3],
|
||||
ServerId: serverId,
|
||||
CountryId: types.Int64(pieces[1]),
|
||||
ProvinceId: types.Int64(pieces[2]),
|
||||
CityId: types.Int64(pieces[3]),
|
||||
CountRequests: stat.CountRequests,
|
||||
CountAttackRequests: stat.CountAttackRequests,
|
||||
Bytes: stat.Bytes,
|
||||
AttackBytes: stat.AttackBytes,
|
||||
})
|
||||
}
|
||||
for k, count := range this.providerMap {
|
||||
if len(cityMap) > int(maxCities) {
|
||||
var newPBCities = []*pb.UploadServerHTTPRequestStatRequest_RegionCity{}
|
||||
sort.Slice(pbCities, func(i, j int) bool {
|
||||
return pbCities[i].CountRequests > pbCities[j].CountRequests
|
||||
})
|
||||
var serverCountMap = map[int64]int16{}
|
||||
for _, city := range pbCities {
|
||||
serverCountMap[city.ServerId]++
|
||||
if serverCountMap[city.ServerId] > maxCities {
|
||||
continue
|
||||
}
|
||||
newPBCities = append(newPBCities, city)
|
||||
}
|
||||
if len(pbCities) != len(newPBCities) {
|
||||
pbCities = newPBCities
|
||||
}
|
||||
}
|
||||
|
||||
// 运营商
|
||||
for k, count := range providerMap {
|
||||
var pieces = strings.SplitN(k, "@", 2)
|
||||
var serverId = types.Int64(pieces[0])
|
||||
pbProviders = append(pbProviders, &pb.UploadServerHTTPRequestStatRequest_RegionProvider{
|
||||
ServerId: types.Int64(pieces[0]),
|
||||
Name: pieces[1],
|
||||
Count: count,
|
||||
ServerId: serverId,
|
||||
ProviderId: types.Int64(pieces[1]),
|
||||
Count: count,
|
||||
})
|
||||
}
|
||||
for k, count := range this.systemMap {
|
||||
if len(providerMap) > int(maxProviders) {
|
||||
var newPBProviders = []*pb.UploadServerHTTPRequestStatRequest_RegionProvider{}
|
||||
sort.Slice(pbProviders, func(i, j int) bool {
|
||||
return pbProviders[i].Count > pbProviders[j].Count
|
||||
})
|
||||
var serverCountMap = map[int64]int16{}
|
||||
for _, provider := range pbProviders {
|
||||
serverCountMap[provider.ServerId]++
|
||||
if serverCountMap[provider.ServerId] > maxProviders {
|
||||
continue
|
||||
}
|
||||
newPBProviders = append(newPBProviders, provider)
|
||||
}
|
||||
if len(pbProviders) != len(newPBProviders) {
|
||||
pbProviders = newPBProviders
|
||||
}
|
||||
}
|
||||
|
||||
// 操作系统
|
||||
for k, count := range systemMap {
|
||||
var pieces = strings.SplitN(k, "@", 3)
|
||||
var serverId = types.Int64(pieces[0])
|
||||
pbSystems = append(pbSystems, &pb.UploadServerHTTPRequestStatRequest_System{
|
||||
ServerId: types.Int64(pieces[0]),
|
||||
ServerId: serverId,
|
||||
Name: pieces[1],
|
||||
Version: pieces[2],
|
||||
Count: count,
|
||||
})
|
||||
}
|
||||
for k, count := range this.browserMap {
|
||||
if len(systemMap) > int(maxSystems) {
|
||||
var newPBSystems = []*pb.UploadServerHTTPRequestStatRequest_System{}
|
||||
sort.Slice(pbSystems, func(i, j int) bool {
|
||||
return pbSystems[i].Count > pbSystems[j].Count
|
||||
})
|
||||
var serverCountMap = map[int64]int16{}
|
||||
for _, system := range pbSystems {
|
||||
serverCountMap[system.ServerId]++
|
||||
if serverCountMap[system.ServerId] > maxSystems {
|
||||
continue
|
||||
}
|
||||
newPBSystems = append(newPBSystems, system)
|
||||
}
|
||||
if len(pbSystems) != len(newPBSystems) {
|
||||
pbSystems = newPBSystems
|
||||
}
|
||||
}
|
||||
|
||||
// 浏览器
|
||||
for k, count := range browserMap {
|
||||
var pieces = strings.SplitN(k, "@", 3)
|
||||
var serverId = types.Int64(pieces[0])
|
||||
pbBrowsers = append(pbBrowsers, &pb.UploadServerHTTPRequestStatRequest_Browser{
|
||||
ServerId: types.Int64(pieces[0]),
|
||||
ServerId: serverId,
|
||||
Name: pieces[1],
|
||||
Version: pieces[2],
|
||||
Count: count,
|
||||
})
|
||||
}
|
||||
if len(browserMap) > int(maxBrowsers) {
|
||||
var newPBBrowsers = []*pb.UploadServerHTTPRequestStatRequest_Browser{}
|
||||
sort.Slice(pbBrowsers, func(i, j int) bool {
|
||||
return pbBrowsers[i].Count > pbBrowsers[j].Count
|
||||
})
|
||||
var serverCountMap = map[int64]int16{}
|
||||
for _, browser := range pbBrowsers {
|
||||
serverCountMap[browser.ServerId]++
|
||||
if serverCountMap[browser.ServerId] > maxBrowsers {
|
||||
continue
|
||||
}
|
||||
newPBBrowsers = append(newPBBrowsers, browser)
|
||||
}
|
||||
if len(pbBrowsers) != len(newPBBrowsers) {
|
||||
pbBrowsers = newPBBrowsers
|
||||
}
|
||||
}
|
||||
|
||||
// 防火墙相关
|
||||
var pbFirewallRuleGroups = []*pb.UploadServerHTTPRequestStatRequest_HTTPFirewallRuleGroup{}
|
||||
for k, count := range this.dailyFirewallRuleGroupMap {
|
||||
for k, count := range dailyFirewallRuleGroupMap {
|
||||
var pieces = strings.SplitN(k, "@", 3)
|
||||
pbFirewallRuleGroups = append(pbFirewallRuleGroups, &pb.UploadServerHTTPRequestStatRequest_HTTPFirewallRuleGroup{
|
||||
ServerId: types.Int64(pieces[0]),
|
||||
@@ -321,13 +485,14 @@ func (this *HTTPRequestStatManager) Upload() error {
|
||||
})
|
||||
}
|
||||
|
||||
// 重置数据
|
||||
// 这里需要放到上传数据之前,防止因上传失败而导致统计数据堆积
|
||||
this.cityMap = map[string]*StatItem{}
|
||||
this.providerMap = map[string]int64{}
|
||||
this.systemMap = map[string]int64{}
|
||||
this.browserMap = map[string]int64{}
|
||||
this.dailyFirewallRuleGroupMap = map[string]int64{}
|
||||
// 检查是否有数据
|
||||
if len(pbCities) == 0 &&
|
||||
len(pbProviders) == 0 &&
|
||||
len(pbSystems) == 0 &&
|
||||
len(pbBrowsers) == 0 &&
|
||||
len(pbFirewallRuleGroups) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 上传数据
|
||||
_, err = rpcClient.ServerRPC.UploadServerHTTPRequestStat(rpcClient.Context(), &pb.UploadServerHTTPRequestStatRequest{
|
||||
|
||||
@@ -3,16 +3,16 @@
|
||||
package agents
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -20,11 +20,11 @@ const (
|
||||
)
|
||||
|
||||
type DB struct {
|
||||
db *sql.DB
|
||||
db *dbs.DB
|
||||
path string
|
||||
|
||||
insertAgentIPStmt *sql.Stmt
|
||||
listAgentIPsStmt *sql.Stmt
|
||||
insertAgentIPStmt *dbs.Stmt
|
||||
listAgentIPsStmt *dbs.Stmt
|
||||
}
|
||||
|
||||
func NewDB(path string) *DB {
|
||||
@@ -51,7 +51,7 @@ func (this *DB) Init() error {
|
||||
}
|
||||
|
||||
// TODO 思考 data.db 的数据安全性
|
||||
db, err := sql.Open("sqlite3", "file:"+this.path+"?cache=shared&mode=rwc&_journal_mode=WAL")
|
||||
db, err := dbs.OpenWriter("file:" + this.path + "?cache=shared&mode=rwc&_journal_mode=WAL&_locking_mode=EXCLUSIVE")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -94,9 +94,13 @@ func (this *DB) InsertAgentIP(ipId int64, ip string, agentCode string) error {
|
||||
return errors.New("db should not be nil")
|
||||
}
|
||||
|
||||
this.log("InsertAgentIP", "id:", ipId, "ip:", ip, "agent:", agentCode)
|
||||
_, err := this.insertAgentIPStmt.Exec(ipId, ip, agentCode)
|
||||
if err != nil {
|
||||
// 不提示ID重复错误
|
||||
if strings.Contains(err.Error(), "UNIQUE constraint") {
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -130,7 +134,7 @@ func (this *DB) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, stmt := range []*sql.Stmt{
|
||||
for _, stmt := range []*dbs.Stmt{
|
||||
this.insertAgentIPStmt,
|
||||
this.listAgentIPsStmt,
|
||||
} {
|
||||
|
||||
@@ -4,6 +4,7 @@ package agents
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
@@ -16,6 +17,10 @@ import (
|
||||
var SharedManager = NewManager()
|
||||
|
||||
func init() {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
events.On(events.EventLoaded, func() {
|
||||
goman.New(func() {
|
||||
SharedManager.Start()
|
||||
|
||||
@@ -4,6 +4,7 @@ package agents
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
@@ -13,6 +14,10 @@ import (
|
||||
)
|
||||
|
||||
func init() {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
events.On(events.EventLoaded, func() {
|
||||
goman.New(func() {
|
||||
SharedQueue.Start()
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
@@ -21,6 +22,10 @@ var hasSynced = false
|
||||
var sharedClockManager = NewClockManager()
|
||||
|
||||
func init() {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
events.On(events.EventLoaded, func() {
|
||||
goman.New(sharedClockManager.Start)
|
||||
})
|
||||
|
||||
@@ -7,14 +7,56 @@ import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils/fileutils"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type DB struct {
|
||||
rawDB *sql.DB
|
||||
locker *fileutils.Locker
|
||||
rawDB *sql.DB
|
||||
|
||||
enableStat bool
|
||||
}
|
||||
|
||||
func OpenWriter(dsn string) (*DB, error) {
|
||||
return open(dsn, true)
|
||||
}
|
||||
|
||||
func OpenReader(dsn string) (*DB, error) {
|
||||
return open(dsn, false)
|
||||
}
|
||||
|
||||
func open(dsn string, lock bool) (*DB, error) {
|
||||
// locker
|
||||
var locker *fileutils.Locker
|
||||
if lock {
|
||||
var path = dsn
|
||||
var queryIndex = strings.Index(dsn, "?")
|
||||
if queryIndex >= 0 {
|
||||
path = path[:queryIndex]
|
||||
}
|
||||
path = strings.TrimSpace(strings.TrimPrefix(path, "file:"))
|
||||
locker = fileutils.NewLocker(path)
|
||||
err := locker.Lock()
|
||||
if err != nil {
|
||||
remotelogs.Warn("DB", "lock '"+path+"' failed: "+err.Error())
|
||||
locker = nil
|
||||
}
|
||||
}
|
||||
|
||||
// open
|
||||
rawDB, err := sql.Open("sqlite3", dsn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var db = NewDB(rawDB)
|
||||
db.locker = locker
|
||||
return db, nil
|
||||
}
|
||||
|
||||
func NewDB(rawDB *sql.DB) *DB {
|
||||
var db = &DB{
|
||||
rawDB: rawDB,
|
||||
@@ -30,6 +72,10 @@ func NewDB(rawDB *sql.DB) *DB {
|
||||
return db
|
||||
}
|
||||
|
||||
func (this *DB) SetMaxOpenConns(n int) {
|
||||
this.rawDB.SetMaxOpenConns(n)
|
||||
}
|
||||
|
||||
func (this *DB) EnableStat(b bool) {
|
||||
this.enableStat = b
|
||||
}
|
||||
@@ -81,6 +127,13 @@ func (this *DB) QueryRow(query string, args ...interface{}) *sql.Row {
|
||||
|
||||
func (this *DB) Close() error {
|
||||
events.Remove(fmt.Sprintf("db_%p", this))
|
||||
|
||||
defer func() {
|
||||
if this.locker != nil {
|
||||
_ = this.locker.Release()
|
||||
}
|
||||
}()
|
||||
|
||||
return this.rawDB.Close()
|
||||
}
|
||||
|
||||
|
||||
@@ -15,6 +15,10 @@ import (
|
||||
)
|
||||
|
||||
func init() {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
var ticker = time.NewTicker(5 * time.Second)
|
||||
|
||||
events.On(events.EventLoaded, func() {
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/iwind/TeaGo/logs"
|
||||
"github.com/iwind/TeaGo/maps"
|
||||
@@ -22,6 +23,10 @@ var (
|
||||
)
|
||||
|
||||
func init() {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
events.On(events.EventReload, func() {
|
||||
nodeConfig, _ := nodeconfigs.SharedNodeConfig()
|
||||
if nodeConfig != nil {
|
||||
|
||||
82
internal/utils/fileutils/locker.go
Normal file
82
internal/utils/fileutils/locker.go
Normal file
@@ -0,0 +1,82 @@
|
||||
// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||
|
||||
package fileutils
|
||||
|
||||
import (
|
||||
"os"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Locker struct {
|
||||
path string
|
||||
fp *os.File
|
||||
}
|
||||
|
||||
func NewLocker(path string) *Locker {
|
||||
return &Locker{
|
||||
path: path + ".lock",
|
||||
}
|
||||
}
|
||||
|
||||
func (this *Locker) TryLock() (ok bool, err error) {
|
||||
if this.fp == nil {
|
||||
fp, err := os.OpenFile(this.path, os.O_CREATE|os.O_WRONLY, 0666)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
this.fp = fp
|
||||
}
|
||||
return this.tryLock()
|
||||
}
|
||||
|
||||
func (this *Locker) Lock() error {
|
||||
if this.fp == nil {
|
||||
fp, err := os.OpenFile(this.path, os.O_CREATE|os.O_WRONLY, 0666)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
this.fp = fp
|
||||
}
|
||||
|
||||
for {
|
||||
b, err := this.tryLock()
|
||||
if err != nil {
|
||||
_ = this.fp.Close()
|
||||
return err
|
||||
}
|
||||
if b {
|
||||
return nil
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
func (this *Locker) Release() error {
|
||||
err := this.fp.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
this.fp = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *Locker) tryLock() (ok bool, err error) {
|
||||
err = syscall.Flock(int(this.fp.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
|
||||
if err == nil {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
errno, isErrNo := err.(syscall.Errno)
|
||||
if !isErrNo {
|
||||
return
|
||||
}
|
||||
|
||||
if !errno.Temporary() {
|
||||
return
|
||||
}
|
||||
|
||||
err = nil // 不提示错误
|
||||
|
||||
return
|
||||
}
|
||||
24
internal/utils/fileutils/locker_test.go
Normal file
24
internal/utils/fileutils/locker_test.go
Normal file
@@ -0,0 +1,24 @@
|
||||
// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||
|
||||
package fileutils_test
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils/fileutils"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestLocker_Lock(t *testing.T) {
|
||||
var path = "/tmp/file-test"
|
||||
var locker = fileutils.NewLocker(path)
|
||||
err := locker.Lock()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_ = locker.Release()
|
||||
|
||||
var locker2 = fileutils.NewLocker(path)
|
||||
err = locker2.Lock()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
@@ -15,6 +15,10 @@ import (
|
||||
var SharedFreeHoursManager = NewFreeHoursManager()
|
||||
|
||||
func init() {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
events.On(events.EventLoaded, func() {
|
||||
goman.New(func() {
|
||||
SharedFreeHoursManager.Start()
|
||||
|
||||
@@ -3,6 +3,7 @@ package utils_test
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils"
|
||||
"github.com/iwind/TeaGo/assert"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
@@ -65,3 +66,15 @@ func TestContainsSameStrings(t *testing.T) {
|
||||
a.IsTrue(utils.EqualStrings([]string{"a", "b"}, []string{"a", "b"}))
|
||||
a.IsTrue(utils.EqualStrings([]string{"a", "b"}, []string{"b", "a"}))
|
||||
}
|
||||
|
||||
func TestToValidUTF8string(t *testing.T) {
|
||||
for _, s := range []string{
|
||||
"https://goedge.cn/",
|
||||
"提升mysql数据表写入速度",
|
||||
"😆",
|
||||
string([]byte{'a', 'b', 130, 131, 132, 133, 134, 'c'}),
|
||||
} {
|
||||
var u = utils.ToValidUTF8string(s)
|
||||
t.Log(s, "["+types.String(len(s))+"]", "=>", u, "["+types.String(len(u))+"]")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,12 +3,17 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/shirou/gopsutil/v3/mem"
|
||||
)
|
||||
|
||||
var systemTotalMemory = -1
|
||||
|
||||
func init() {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
_ = SystemMemoryGB()
|
||||
}
|
||||
|
||||
@@ -23,5 +28,11 @@ func SystemMemoryGB() int {
|
||||
}
|
||||
|
||||
systemTotalMemory = int(stat.Total / 1024 / 1024 / 1024)
|
||||
if systemTotalMemory <= 0 {
|
||||
systemTotalMemory = 1
|
||||
}
|
||||
|
||||
setMaxMemory(systemTotalMemory)
|
||||
|
||||
return systemTotalMemory
|
||||
}
|
||||
|
||||
23
internal/utils/system_1.19.go
Normal file
23
internal/utils/system_1.19.go
Normal file
@@ -0,0 +1,23 @@
|
||||
// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||
//go:build go1.19
|
||||
|
||||
package utils
|
||||
|
||||
import (
|
||||
"runtime/debug"
|
||||
)
|
||||
|
||||
// 设置软内存最大值
|
||||
func setMaxMemory(memoryGB int) {
|
||||
if memoryGB <= 0 {
|
||||
memoryGB = 1
|
||||
}
|
||||
var maxMemoryBytes int64 = 0
|
||||
if memoryGB > 10 {
|
||||
maxMemoryBytes = int64(memoryGB-2) << 30 // 超过10G内存的允许剩余2G内存
|
||||
} else {
|
||||
maxMemoryBytes = (int64(memoryGB) << 30) * 80 / 100 // 默认 80%
|
||||
}
|
||||
|
||||
debug.SetMemoryLimit(maxMemoryBytes)
|
||||
}
|
||||
9
internal/utils/system_before_1.19.go
Normal file
9
internal/utils/system_before_1.19.go
Normal file
@@ -0,0 +1,9 @@
|
||||
// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||
//go:build !go1.19
|
||||
|
||||
package utils
|
||||
|
||||
// 设置软内存最大值
|
||||
func setMaxMemory(memoryGB int) {
|
||||
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
"time"
|
||||
@@ -11,6 +12,10 @@ var unixTimeMilli = time.Now().UnixMilli()
|
||||
var unixTimeMilliString = types.String(unixTimeMilli)
|
||||
|
||||
func init() {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
var ticker = time.NewTicker(200 * time.Millisecond)
|
||||
goman.New(func() {
|
||||
for range ticker.C {
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/waf/requests"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
"github.com/iwind/TeaGo/logs"
|
||||
"github.com/iwind/TeaGo/rands"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
@@ -25,6 +26,7 @@ type BlockAction struct {
|
||||
Body string `yaml:"body" json:"body"` // supports HTML
|
||||
URL string `yaml:"url" json:"url"`
|
||||
Timeout int32 `yaml:"timeout" json:"timeout"`
|
||||
TimeoutMax int32 `yaml:"timeoutMax" json:"timeoutMax"`
|
||||
Scope string `yaml:"scope" json:"scope"`
|
||||
}
|
||||
|
||||
@@ -41,6 +43,7 @@ func (this *BlockAction) Init(waf *WAF) error {
|
||||
}
|
||||
if this.Timeout <= 0 {
|
||||
this.Timeout = waf.DefaultBlockAction.Timeout
|
||||
this.TimeoutMax = waf.DefaultBlockAction.TimeoutMax // 只有没有填写封锁时长的时候才会使用默认的封锁时长最大值
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@@ -65,6 +68,12 @@ func (this *BlockAction) Perform(waf *WAF, group *RuleGroup, set *RuleSet, reque
|
||||
timeout = 300 // 默认封锁300秒
|
||||
}
|
||||
|
||||
// 随机时长
|
||||
var timeoutMax = this.TimeoutMax
|
||||
if timeoutMax > timeout {
|
||||
timeout = timeout + int32(rands.Int64()%int64(timeoutMax-timeout+1))
|
||||
}
|
||||
|
||||
SharedIPBlackList.RecordIP(IPTypeAll, this.Scope, request.WAFServerId(), request.WAFRemoteIP(), time.Now().Unix()+int64(timeout), waf.Id, waf.UseLocalFirewall, group.Id, set.Id, "")
|
||||
|
||||
if writer != nil {
|
||||
|
||||
@@ -69,14 +69,10 @@ func (this *Get302Action) Perform(waf *WAF, group *RuleGroup, set *RuleSet, requ
|
||||
|
||||
http.Redirect(writer, request.WAFRaw(), Get302Path+"?info="+url.QueryEscape(info), http.StatusFound)
|
||||
|
||||
if request.WAFRaw().ProtoMajor == 1 {
|
||||
flusher, ok := writer.(http.Flusher)
|
||||
if ok {
|
||||
flusher.Flush()
|
||||
}
|
||||
|
||||
request.WAFClose()
|
||||
flusher, ok := writer.(http.Flusher)
|
||||
if ok {
|
||||
flusher.Flush()
|
||||
}
|
||||
|
||||
|
||||
return false, false
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ package waf
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
@@ -25,6 +26,10 @@ type notifyTask struct {
|
||||
var notifyChan = make(chan *notifyTask, 128)
|
||||
|
||||
func init() {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
events.On(events.EventLoaded, func() {
|
||||
goman.New(func() {
|
||||
rpcClient, err := rpc.SharedRPC()
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/waf/requests"
|
||||
"github.com/iwind/TeaGo/maps"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
@@ -55,7 +56,7 @@ func (this *Post307Action) Perform(waf *WAF, group *RuleGroup, set *RuleSet, req
|
||||
if life <= 0 {
|
||||
life = 600 // 默认10分钟
|
||||
}
|
||||
var setId = m.GetString("setId")
|
||||
var setId = types.String(m.GetInt64("setId"))
|
||||
SharedIPWhiteList.RecordIP("set:"+setId, this.Scope, request.WAFServerId(), request.WAFRemoteIP(), time.Now().Unix()+life, m.GetInt64("policyId"), false, m.GetInt64("groupId"), m.GetInt64("setId"), "")
|
||||
return true, false
|
||||
}
|
||||
@@ -72,10 +73,17 @@ func (this *Post307Action) Perform(waf *WAF, group *RuleGroup, set *RuleSet, req
|
||||
}
|
||||
info, err := utils.SimpleEncryptMap(m)
|
||||
if err != nil {
|
||||
remotelogs.Error("WAF_POST_302_ACTION", "encode info failed: "+err.Error())
|
||||
remotelogs.Error("WAF_POST_307_ACTION", "encode info failed: "+err.Error())
|
||||
return true, false
|
||||
}
|
||||
|
||||
// 清空请求内容
|
||||
var req = request.WAFRaw()
|
||||
if req.ContentLength > 0 && req.Body != nil {
|
||||
_, _ = io.Copy(io.Discard, req.Body)
|
||||
_ = req.Body.Close()
|
||||
}
|
||||
|
||||
// 设置Cookie
|
||||
http.SetCookie(writer, &http.Cookie{
|
||||
Name: cookieName,
|
||||
@@ -86,13 +94,9 @@ func (this *Post307Action) Perform(waf *WAF, group *RuleGroup, set *RuleSet, req
|
||||
|
||||
http.Redirect(writer, request.WAFRaw(), request.WAFRaw().URL.String(), http.StatusTemporaryRedirect)
|
||||
|
||||
if request.WAFRaw().ProtoMajor == 1 {
|
||||
flusher, ok := writer.(http.Flusher)
|
||||
if ok {
|
||||
flusher.Flush()
|
||||
}
|
||||
|
||||
request.WAFClose()
|
||||
flusher, ok := writer.(http.Flusher)
|
||||
if ok {
|
||||
flusher.Flush()
|
||||
}
|
||||
|
||||
return false, false
|
||||
|
||||
@@ -33,6 +33,10 @@ type recordIPTask struct {
|
||||
var recordIPTaskChan = make(chan *recordIPTask, 1024)
|
||||
|
||||
func init() {
|
||||
if !teaconst.IsMain {
|
||||
return
|
||||
}
|
||||
|
||||
events.On(events.EventLoaded, func() {
|
||||
goman.New(func() {
|
||||
rpcClient, err := rpc.SharedRPC()
|
||||
|
||||
@@ -162,7 +162,7 @@ func (this *CaptchaValidator) show(actionConfig *CaptchaAction, req requests.Req
|
||||
</div>
|
||||
<div class="ui-input">
|
||||
<p>` + msgPrompt + `</p>
|
||||
<input type="text" name="GOEDGE_WAF_CAPTCHA_CODE" id="GOEDGE_WAF_CAPTCHA_CODE" maxlength="6" autocomplete="off" z-index="1" class="input"/>
|
||||
<input type="text" name="GOEDGE_WAF_CAPTCHA_CODE" id="GOEDGE_WAF_CAPTCHA_CODE" size="` + types.String(countLetters*17/10) + `" maxlength="` + types.String(countLetters) + `" autocomplete="off" z-index="1" class="input"/>
|
||||
</div>
|
||||
<div class="ui-button">
|
||||
<button type="submit" style="line-height:24px;margin-top:10px">` + msgButtonTitle + `</button>
|
||||
@@ -199,7 +199,7 @@ func (this *CaptchaValidator) show(actionConfig *CaptchaAction, req requests.Req
|
||||
</script>
|
||||
<style type="text/css">
|
||||
form { width: 20em; margin: 0 auto; text-align: center; }
|
||||
.input { font-size:16px;line-height:24px; letter-spacing: 15px; padding-left: 10px; width: 140px; }
|
||||
.input { font-size:16px;line-height:24px; letter-spacing:0.2em; min-width: 5em; text-align: center; }
|
||||
address { margin-top: 1em; padding-top: 0.5em; border-top: 1px #ccc solid; text-align: center; }
|
||||
` + msgCss + `
|
||||
</style>
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
package checkpoints
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/ttlcache"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/waf/requests"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/zero"
|
||||
@@ -35,7 +36,11 @@ type CC2Checkpoint struct {
|
||||
func (this *CC2Checkpoint) RequestValue(req requests.Request, param string, options maps.Map, ruleId int64) (value interface{}, hasRequestBody bool, sysErr error, userErr error) {
|
||||
var keys = options.GetSlice("keys")
|
||||
var keyValues = []string{}
|
||||
var hasRemoteAddr = false
|
||||
for _, key := range keys {
|
||||
if key == "${remoteAddr}" || key == "${rawRemoteAddr}" {
|
||||
hasRemoteAddr = true
|
||||
}
|
||||
keyValues = append(keyValues, req.Format(types.String(key)))
|
||||
}
|
||||
if len(keyValues) == 0 {
|
||||
@@ -66,8 +71,33 @@ func (this *CC2Checkpoint) RequestValue(req requests.Request, param string, opti
|
||||
}
|
||||
}
|
||||
|
||||
var expiresAt = time.Now().Unix() + period
|
||||
var ccKey = "WAF-CC-" + types.String(ruleId) + "-" + strings.Join(keyValues, "@")
|
||||
value = ccCache.IncreaseInt64(ccKey, 1, time.Now().Unix()+period, false)
|
||||
value = ccCache.IncreaseInt64(ccKey, 1, expiresAt, false)
|
||||
|
||||
// 基于指纹统计
|
||||
var enableFingerprint = true
|
||||
if options.Has("enableFingerprint") && !options.GetBool("enableFingerprint") {
|
||||
enableFingerprint = false
|
||||
}
|
||||
if hasRemoteAddr && enableFingerprint {
|
||||
var fingerprint = req.WAFFingerprint()
|
||||
if len(fingerprint) > 0 {
|
||||
var fpKeyValues = []string{}
|
||||
for _, key := range keys {
|
||||
if key == "${remoteAddr}" || key == "${rawRemoteAddr}" {
|
||||
fpKeyValues = append(fpKeyValues, fmt.Sprintf("%x", fingerprint))
|
||||
continue
|
||||
}
|
||||
fpKeyValues = append(fpKeyValues, req.Format(types.String(key)))
|
||||
}
|
||||
var fpCCKey = "WAF-CC-" + types.String(ruleId) + "-" + strings.Join(fpKeyValues, "@")
|
||||
var fpValue = ccCache.IncreaseInt64(fpCCKey, 1, expiresAt, false)
|
||||
if fpValue > value.(int64) {
|
||||
value = fpValue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ package waf
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/waf/requests"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
@@ -39,11 +40,11 @@ func (this *Get302Validator) Run(request requests.Request, writer http.ResponseW
|
||||
}
|
||||
|
||||
// 加入白名单
|
||||
life := m.GetInt64("life")
|
||||
var life = m.GetInt64("life")
|
||||
if life <= 0 {
|
||||
life = 600 // 默认10分钟
|
||||
}
|
||||
setId := m.GetString("setId")
|
||||
var setId = types.String(m.GetInt64("setId"))
|
||||
SharedIPWhiteList.RecordIP("set:"+setId, m.GetString("scope"), request.WAFServerId(), request.WAFRemoteIP(), time.Now().Unix()+life, m.GetInt64("policyId"), false, m.GetInt64("groupId"), m.GetInt64("setId"), "")
|
||||
|
||||
// 返回原始URL
|
||||
|
||||
@@ -32,6 +32,9 @@ type Request interface {
|
||||
// WAFOnAction 动作回调
|
||||
WAFOnAction(action interface{}) (goNext bool)
|
||||
|
||||
// WAFFingerprint 读取连接指纹
|
||||
WAFFingerprint() []byte
|
||||
|
||||
// Format 格式化变量
|
||||
Format(string) string
|
||||
}
|
||||
|
||||
@@ -180,6 +180,7 @@ func (this *WAFManager) ConvertWAF(policy *firewallconfigs.HTTPFirewallPolicy) (
|
||||
Body: policy.BlockOptions.Body,
|
||||
URL: policy.BlockOptions.URL,
|
||||
Timeout: policy.BlockOptions.Timeout,
|
||||
TimeoutMax: policy.BlockOptions.TimeoutMax,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user