Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
20bee16d28 | ||
|
|
b1cd971a21 | ||
|
|
d976a39711 | ||
|
|
accd0236ea | ||
|
|
fc401a1426 | ||
|
|
7e8c09a684 | ||
|
|
37ddff86f1 | ||
|
|
4dc25fb71e | ||
|
|
42883fbe22 | ||
|
|
a88d9a07be |
@@ -226,6 +226,20 @@ func (this *FileListDB) Init() error {
|
||||
err := this.hashMap.Load(this)
|
||||
if err != nil {
|
||||
remotelogs.Error("LIST_FILE_DB", "load hash map failed: "+err.Error()+"(file: "+this.dbPath+")")
|
||||
|
||||
// 自动修复错误
|
||||
// TODO 将来希望能尽可能恢复以往数据库中的内容
|
||||
if strings.Contains(err.Error(), "database is closed") || strings.Contains(err.Error(), "database disk image is malformed") {
|
||||
_ = this.Close()
|
||||
this.deleteDB()
|
||||
remotelogs.Println("LIST_FILE_DB", "recreating the database (file:"+this.dbPath+") ...")
|
||||
err = this.Open(this.dbPath)
|
||||
if err != nil {
|
||||
remotelogs.Error("LIST_FILE_DB", "recreate the database failed: "+err.Error()+" (file:"+this.dbPath+")")
|
||||
} else {
|
||||
_ = this.Init()
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -683,3 +697,10 @@ func (this *FileListDB) shouldRecover() bool {
|
||||
_ = result.Close()
|
||||
return shouldRecover
|
||||
}
|
||||
|
||||
// 删除数据库文件
|
||||
func (this *FileListDB) deleteDB() {
|
||||
_ = os.Remove(this.dbPath)
|
||||
_ = os.Remove(this.dbPath + "-shm")
|
||||
_ = os.Remove(this.dbPath + "-wal")
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package teaconst
|
||||
|
||||
const (
|
||||
Version = "1.0.1"
|
||||
Version = "1.0.4"
|
||||
|
||||
ProductName = "Edge Node"
|
||||
ProcessName = "edge-node"
|
||||
|
||||
@@ -88,11 +88,15 @@ type blockIPItem struct {
|
||||
}
|
||||
|
||||
func NewNFTablesFirewall() (*NFTablesFirewall, error) {
|
||||
conn, err := nftables.NewConn()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var firewall = &NFTablesFirewall{
|
||||
conn: nftables.NewConn(),
|
||||
conn: conn,
|
||||
dropIPQueue: make(chan *blockIPItem, 4096),
|
||||
}
|
||||
err := firewall.init()
|
||||
err = firewall.init()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
//go:build linux
|
||||
// +build linux
|
||||
|
||||
package nftables
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
//go:build linux
|
||||
|
||||
package nftables
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
//go:build linux
|
||||
// +build linux
|
||||
|
||||
package nftables_test
|
||||
|
||||
@@ -11,7 +10,10 @@ import (
|
||||
)
|
||||
|
||||
func getIPv4Chain(t *testing.T) *nftables.Chain {
|
||||
var conn = nftables.NewConn()
|
||||
conn, err := nftables.NewConn()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
table, err := conn.GetTable("test_ipv4", nftables.TableFamilyIPv4)
|
||||
if err != nil {
|
||||
if err == nftables.ErrTableNotFound {
|
||||
|
||||
@@ -15,10 +15,14 @@ type Conn struct {
|
||||
rawConn *nft.Conn
|
||||
}
|
||||
|
||||
func NewConn() *Conn {
|
||||
return &Conn{
|
||||
rawConn: &nft.Conn{},
|
||||
func NewConn() (*Conn, error) {
|
||||
conn, err := nft.New()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Conn{
|
||||
rawConn: conn,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (this *Conn) Raw() *nft.Conn {
|
||||
|
||||
@@ -4,7 +4,10 @@
|
||||
|
||||
package nftables
|
||||
|
||||
import "errors"
|
||||
import (
|
||||
"errors"
|
||||
"strings"
|
||||
)
|
||||
|
||||
var ErrTableNotFound = errors.New("table not found")
|
||||
var ErrChainNotFound = errors.New("chain not found")
|
||||
@@ -15,5 +18,5 @@ func IsNotFound(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
return err == ErrTableNotFound || err == ErrChainNotFound || err == ErrSetNotFound || err == ErrRuleNotFound
|
||||
return err == ErrTableNotFound || err == ErrChainNotFound || err == ErrSetNotFound || err == ErrRuleNotFound || strings.Contains(err.Error(), "no such file or directory")
|
||||
}
|
||||
|
||||
65
internal/firewalls/nftables/expration.go
Normal file
65
internal/firewalls/nftables/expration.go
Normal file
@@ -0,0 +1,65 @@
|
||||
// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||
|
||||
package nftables
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Expiration struct {
|
||||
m map[string]time.Time // key => expires time
|
||||
|
||||
lastGCAt int64
|
||||
|
||||
locker sync.RWMutex
|
||||
}
|
||||
|
||||
func NewExpiration() *Expiration {
|
||||
return &Expiration{
|
||||
m: map[string]time.Time{},
|
||||
}
|
||||
}
|
||||
|
||||
func (this *Expiration) AddUnsafe(key []byte, expires time.Time) {
|
||||
this.m[string(key)] = expires
|
||||
}
|
||||
|
||||
func (this *Expiration) Add(key []byte, expires time.Time) {
|
||||
this.locker.Lock()
|
||||
this.m[string(key)] = expires
|
||||
this.gc()
|
||||
this.locker.Unlock()
|
||||
}
|
||||
|
||||
func (this *Expiration) Remove(key []byte) {
|
||||
this.locker.Lock()
|
||||
delete(this.m, string(key))
|
||||
this.locker.Unlock()
|
||||
}
|
||||
|
||||
func (this *Expiration) Contains(key []byte) bool {
|
||||
this.locker.RLock()
|
||||
expires, ok := this.m[string(key)]
|
||||
if ok && expires.Year() > 2000 && time.Now().After(expires) {
|
||||
ok = false
|
||||
}
|
||||
this.locker.RUnlock()
|
||||
return ok
|
||||
}
|
||||
|
||||
func (this *Expiration) gc() {
|
||||
// we won't gc too frequently
|
||||
var currentTime = time.Now().Unix()
|
||||
if this.lastGCAt >= currentTime {
|
||||
return
|
||||
}
|
||||
this.lastGCAt = currentTime
|
||||
|
||||
var now = time.Now().Add(-10 * time.Second) // gc elements expired before 10 seconds ago
|
||||
for key, expires := range this.m {
|
||||
if expires.Year() > 2000 && now.After(expires) {
|
||||
delete(this.m, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
59
internal/firewalls/nftables/expration_test.go
Normal file
59
internal/firewalls/nftables/expration_test.go
Normal file
@@ -0,0 +1,59 @@
|
||||
// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||
|
||||
package nftables_test
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/firewalls/nftables"
|
||||
"github.com/iwind/TeaGo/rands"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestExpiration_Add(t *testing.T) {
|
||||
var expiration = nftables.NewExpiration()
|
||||
{
|
||||
expiration.Add([]byte{'a', 'b', 'c'}, time.Now())
|
||||
t.Log(expiration.Contains([]byte{'a', 'b', 'c'}))
|
||||
}
|
||||
{
|
||||
expiration.Add([]byte{'a', 'b', 'c'}, time.Now().Add(1*time.Second))
|
||||
t.Log(expiration.Contains([]byte{'a', 'b', 'c'}))
|
||||
}
|
||||
{
|
||||
expiration.Add([]byte{'a', 'b', 'c'}, time.Time{})
|
||||
t.Log(expiration.Contains([]byte{'a', 'b', 'c'}))
|
||||
}
|
||||
{
|
||||
expiration.Add([]byte{'a', 'b', 'c'}, time.Now().Add(-1*time.Second))
|
||||
t.Log(expiration.Contains([]byte{'a', 'b', 'c'}))
|
||||
}
|
||||
{
|
||||
expiration.Add([]byte{'a', 'b', 'c'}, time.Now().Add(-10*time.Second))
|
||||
t.Log(expiration.Contains([]byte{'a', 'b', 'c'}))
|
||||
}
|
||||
{
|
||||
expiration.Add([]byte{'a', 'b', 'c'}, time.Now().Add(1*time.Second))
|
||||
expiration.Remove([]byte{'a', 'b', 'c'})
|
||||
t.Log(expiration.Contains([]byte{'a', 'b', 'c'}))
|
||||
}
|
||||
{
|
||||
expiration.Add(net.ParseIP("10.254.0.75").To4(), time.Now())
|
||||
t.Log(expiration.Contains(net.ParseIP("10.254.0.75").To4()))
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkNewExpiration(b *testing.B) {
|
||||
var expiration = nftables.NewExpiration()
|
||||
for i := 0; i < 10_000; i++ {
|
||||
expiration.Add([]byte(types.String(types.String(rands.Int(0, 255))+"."+types.String(rands.Int(0, 255))+"."+types.String(rands.Int(0, 255))+"."+types.String(rands.Int(0, 255)))), time.Now().Add(3600*time.Second))
|
||||
}
|
||||
b.ResetTimer()
|
||||
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
expiration.Add([]byte(types.String(types.String(rands.Int(0, 255))+"."+types.String(rands.Int(0, 255))+"."+types.String(rands.Int(0, 255))+"."+types.String(rands.Int(0, 255)))), time.Now().Add(3600*time.Second))
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -1,4 +1,5 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
//go:build linux
|
||||
|
||||
package nftables
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||
//go:build linux
|
||||
|
||||
package nftables
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
//go:build linux
|
||||
|
||||
package nftables
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
//go:build linux
|
||||
// +build linux
|
||||
|
||||
package nftables
|
||||
|
||||
@@ -35,17 +34,25 @@ type Set struct {
|
||||
conn *Conn
|
||||
rawSet *nft.Set
|
||||
batch *SetBatch
|
||||
|
||||
expiration *Expiration
|
||||
}
|
||||
|
||||
func NewSet(conn *Conn, rawSet *nft.Set) *Set {
|
||||
return &Set{
|
||||
conn: conn,
|
||||
rawSet: rawSet,
|
||||
var set = &Set{
|
||||
conn: conn,
|
||||
rawSet: rawSet,
|
||||
expiration: nil,
|
||||
batch: &SetBatch{
|
||||
conn: conn,
|
||||
rawSet: rawSet,
|
||||
},
|
||||
}
|
||||
|
||||
// retrieve set elements to improve "delete" speed
|
||||
set.initElements()
|
||||
|
||||
return set
|
||||
}
|
||||
|
||||
func (this *Set) Raw() *nft.Set {
|
||||
@@ -57,11 +64,21 @@ func (this *Set) Name() string {
|
||||
}
|
||||
|
||||
func (this *Set) AddElement(key []byte, options *ElementOptions, overwrite bool) error {
|
||||
// check if already exists
|
||||
if this.expiration != nil && !overwrite && this.expiration.Contains(key) {
|
||||
return nil
|
||||
}
|
||||
|
||||
var expiresTime = time.Time{}
|
||||
var rawElement = nft.SetElement{
|
||||
Key: key,
|
||||
}
|
||||
if options != nil {
|
||||
rawElement.Timeout = options.Timeout
|
||||
|
||||
if options.Timeout > 0 {
|
||||
expiresTime = time.UnixMilli(time.Now().UnixMilli() + options.Timeout.Milliseconds())
|
||||
}
|
||||
}
|
||||
err := this.conn.Raw().SetAddElements(this.rawSet, []nft.SetElement{
|
||||
rawElement,
|
||||
@@ -71,9 +88,19 @@ func (this *Set) AddElement(key []byte, options *ElementOptions, overwrite bool)
|
||||
}
|
||||
|
||||
err = this.conn.Commit()
|
||||
if err != nil {
|
||||
if err == nil {
|
||||
if this.expiration != nil {
|
||||
this.expiration.Add(key, expiresTime)
|
||||
}
|
||||
} else {
|
||||
var isFileExistsErr = strings.Contains(err.Error(), "file exists")
|
||||
if !overwrite && isFileExistsErr {
|
||||
// ignore file exists error
|
||||
return nil
|
||||
}
|
||||
|
||||
// retry if exists
|
||||
if overwrite && strings.Contains(err.Error(), "file exists") {
|
||||
if overwrite && isFileExistsErr {
|
||||
deleteErr := this.conn.Raw().SetDeleteElements(this.rawSet, []nft.SetElement{
|
||||
{
|
||||
Key: key,
|
||||
@@ -85,6 +112,11 @@ func (this *Set) AddElement(key []byte, options *ElementOptions, overwrite bool)
|
||||
})
|
||||
if err == nil {
|
||||
err = this.conn.Commit()
|
||||
if err == nil {
|
||||
if this.expiration != nil {
|
||||
this.expiration.Add(key, expiresTime)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -107,6 +139,11 @@ func (this *Set) AddIPElement(ip string, options *ElementOptions, overwrite bool
|
||||
}
|
||||
|
||||
func (this *Set) DeleteElement(key []byte) error {
|
||||
// if set element does not exist, we return immediately
|
||||
if this.expiration != nil && !this.expiration.Contains(key) {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := this.conn.Raw().SetDeleteElements(this.rawSet, []nft.SetElement{
|
||||
{
|
||||
Key: key,
|
||||
@@ -116,9 +153,17 @@ func (this *Set) DeleteElement(key []byte) error {
|
||||
return err
|
||||
}
|
||||
err = this.conn.Commit()
|
||||
if err != nil {
|
||||
if err == nil {
|
||||
if this.expiration != nil {
|
||||
this.expiration.Remove(key)
|
||||
}
|
||||
} else {
|
||||
if strings.Contains(err.Error(), "no such file or directory") {
|
||||
err = nil
|
||||
|
||||
if this.expiration != nil {
|
||||
this.expiration.Remove(key)
|
||||
}
|
||||
}
|
||||
}
|
||||
return err
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
//go:build linux
|
||||
|
||||
package nftables
|
||||
|
||||
|
||||
8
internal/firewalls/nftables/set_ext.go
Normal file
8
internal/firewalls/nftables/set_ext.go
Normal file
@@ -0,0 +1,8 @@
|
||||
// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||
//go:build linux && !plus
|
||||
|
||||
package nftables
|
||||
|
||||
func (this *Set) initElements() {
|
||||
// NOT IMPLEMENTED
|
||||
}
|
||||
@@ -34,7 +34,7 @@ func getIPv4Set(t *testing.T) *nftables.Set {
|
||||
|
||||
func TestSet_AddElement(t *testing.T) {
|
||||
var set = getIPv4Set(t)
|
||||
err := set.AddElement(net.ParseIP("192.168.2.31").To4(), &nftables.ElementOptions{Timeout: 86400 * time.Second})
|
||||
err := set.AddElement(net.ParseIP("192.168.2.31").To4(), &nftables.ElementOptions{Timeout: 86400 * time.Second}, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
//go:build linux
|
||||
// +build linux
|
||||
|
||||
package nftables
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
//go:build linux
|
||||
// +build linux
|
||||
|
||||
package nftables_test
|
||||
|
||||
@@ -10,7 +9,10 @@ import (
|
||||
)
|
||||
|
||||
func getIPv4Table(t *testing.T) *nftables.Table {
|
||||
var conn = nftables.NewConn()
|
||||
conn, err := nftables.NewConn()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
table, err := conn.GetTable("test_ipv4", nftables.TableFamilyIPv4)
|
||||
if err != nil {
|
||||
if err == nftables.ErrTableNotFound {
|
||||
|
||||
@@ -17,13 +17,17 @@ import (
|
||||
type IPListDB struct {
|
||||
db *dbs.DB
|
||||
|
||||
itemTableName string
|
||||
itemTableName string
|
||||
versionTableName string
|
||||
|
||||
deleteExpiredItemsStmt *dbs.Stmt
|
||||
deleteItemStmt *dbs.Stmt
|
||||
insertItemStmt *dbs.Stmt
|
||||
selectItemsStmt *dbs.Stmt
|
||||
selectMaxVersionStmt *dbs.Stmt
|
||||
deleteExpiredItemsStmt *dbs.Stmt
|
||||
deleteItemStmt *dbs.Stmt
|
||||
insertItemStmt *dbs.Stmt
|
||||
selectItemsStmt *dbs.Stmt
|
||||
selectMaxItemVersionStmt *dbs.Stmt
|
||||
|
||||
selectVersionStmt *dbs.Stmt
|
||||
updateVersionStmt *dbs.Stmt
|
||||
|
||||
cleanTicker *time.Ticker
|
||||
|
||||
@@ -34,9 +38,10 @@ type IPListDB struct {
|
||||
|
||||
func NewIPListDB() (*IPListDB, error) {
|
||||
var db = &IPListDB{
|
||||
itemTableName: "ipItems",
|
||||
dir: filepath.Clean(Tea.Root + "/data"),
|
||||
cleanTicker: time.NewTicker(24 * time.Hour),
|
||||
itemTableName: "ipItems",
|
||||
versionTableName: "versions",
|
||||
dir: filepath.Clean(Tea.Root + "/data"),
|
||||
cleanTicker: time.NewTicker(24 * time.Hour),
|
||||
}
|
||||
err := db.init()
|
||||
return db, err
|
||||
@@ -108,6 +113,15 @@ ON "` + this.itemTableName + `" (
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS "` + this.versionTableName + `" (
|
||||
"id" integer NOT NULL PRIMARY KEY AUTOINCREMENT,
|
||||
"version" integer DEFAULT 0
|
||||
);
|
||||
`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 初始化SQL语句
|
||||
this.deleteExpiredItemsStmt, err = this.db.Prepare(`DELETE FROM "` + this.itemTableName + `" WHERE "expiredAt">0 AND "expiredAt"<?`)
|
||||
if err != nil {
|
||||
@@ -129,7 +143,20 @@ ON "` + this.itemTableName + `" (
|
||||
return err
|
||||
}
|
||||
|
||||
this.selectMaxVersionStmt, err = this.db.Prepare(`SELECT "version" FROM "` + this.itemTableName + `" ORDER BY "id" DESC LIMIT 1`)
|
||||
this.selectMaxItemVersionStmt, err = this.db.Prepare(`SELECT "version" FROM "` + this.itemTableName + `" ORDER BY "id" DESC LIMIT 1`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
this.selectVersionStmt, err = this.db.Prepare(`SELECT "version" FROM "` + this.versionTableName + `" LIMIT 1`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
this.updateVersionStmt, err = this.db.Prepare(`REPLACE INTO "` + this.versionTableName + `" ("id", "version") VALUES (1, ?)`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
this.db = db
|
||||
|
||||
@@ -172,11 +199,15 @@ func (this *IPListDB) AddItem(item *pb.IPItem) error {
|
||||
|
||||
// 如果是删除,则不再创建新记录
|
||||
if item.IsDeleted {
|
||||
return nil
|
||||
return this.UpdateMaxVersion(item.Version)
|
||||
}
|
||||
|
||||
_, err = this.insertItemStmt.Exec(item.ListId, item.ListType, item.IsGlobal, item.Type, item.Id, item.IpFrom, item.IpTo, item.ExpiredAt, item.EventLevel, item.IsDeleted, item.Version, item.NodeId, item.ServerId)
|
||||
return err
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return this.UpdateMaxVersion(item.Version)
|
||||
}
|
||||
|
||||
func (this *IPListDB) ReadItems(offset int64, size int64) (items []*pb.IPItem, err error) {
|
||||
@@ -210,27 +241,63 @@ func (this *IPListDB) ReadMaxVersion() int64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
var row = this.selectMaxVersionStmt.QueryRow()
|
||||
if row == nil {
|
||||
return 0
|
||||
// from version table
|
||||
{
|
||||
var row = this.selectVersionStmt.QueryRow()
|
||||
if row == nil {
|
||||
return 0
|
||||
}
|
||||
var version int64
|
||||
err := row.Scan(&version)
|
||||
if err == nil {
|
||||
return version
|
||||
}
|
||||
}
|
||||
var version int64
|
||||
err := row.Scan(&version)
|
||||
if err != nil {
|
||||
return 0
|
||||
|
||||
// from items table
|
||||
{
|
||||
var row = this.selectMaxItemVersionStmt.QueryRow()
|
||||
if row == nil {
|
||||
return 0
|
||||
}
|
||||
var version int64
|
||||
err := row.Scan(&version)
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
return version
|
||||
}
|
||||
return version
|
||||
}
|
||||
|
||||
// UpdateMaxVersion 修改版本号
|
||||
func (this *IPListDB) UpdateMaxVersion(version int64) error {
|
||||
if this.isClosed {
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err := this.updateVersionStmt.Exec(version)
|
||||
return err
|
||||
}
|
||||
|
||||
func (this *IPListDB) Close() error {
|
||||
this.isClosed = true
|
||||
|
||||
if this.db != nil {
|
||||
_ = this.deleteExpiredItemsStmt.Close()
|
||||
_ = this.deleteItemStmt.Close()
|
||||
_ = this.insertItemStmt.Close()
|
||||
_ = this.selectItemsStmt.Close()
|
||||
_ = this.selectMaxVersionStmt.Close()
|
||||
for _, stmt := range []*dbs.Stmt{
|
||||
this.deleteExpiredItemsStmt,
|
||||
this.deleteItemStmt,
|
||||
this.insertItemStmt,
|
||||
this.selectItemsStmt,
|
||||
this.selectMaxItemVersionStmt, // ipItems table
|
||||
|
||||
this.selectVersionStmt, // versions table
|
||||
this.updateVersionStmt,
|
||||
} {
|
||||
if stmt != nil {
|
||||
_ = stmt.Close()
|
||||
}
|
||||
}
|
||||
|
||||
return this.db.Close()
|
||||
}
|
||||
|
||||
@@ -79,3 +79,15 @@ func TestIPListDB_ReadMaxVersion(t *testing.T) {
|
||||
}
|
||||
t.Log(db.ReadMaxVersion())
|
||||
}
|
||||
|
||||
func TestIPListDB_UpdateMaxVersion(t *testing.T) {
|
||||
db, err := iplibrary.NewIPListDB()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = db.UpdateMaxVersion(1027)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log(db.ReadMaxVersion())
|
||||
}
|
||||
|
||||
@@ -47,17 +47,20 @@ type IPListManager struct {
|
||||
|
||||
db *IPListDB
|
||||
|
||||
version int64
|
||||
pageSize int64
|
||||
lastVersion int64
|
||||
fetchPageSize int64
|
||||
|
||||
listMap map[int64]*IPList
|
||||
locker sync.Mutex
|
||||
|
||||
isFirstTime bool
|
||||
}
|
||||
|
||||
func NewIPListManager() *IPListManager {
|
||||
return &IPListManager{
|
||||
pageSize: 1000,
|
||||
listMap: map[int64]*IPList{},
|
||||
fetchPageSize: 5_000,
|
||||
listMap: map[int64]*IPList{},
|
||||
isFirstTime: true,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -117,11 +120,11 @@ func (this *IPListManager) init() {
|
||||
_ = db.DeleteExpiredItems()
|
||||
|
||||
// 本地数据库中最大版本号
|
||||
this.version = db.ReadMaxVersion()
|
||||
this.lastVersion = db.ReadMaxVersion()
|
||||
|
||||
// 从本地数据库中加载
|
||||
var offset int64 = 0
|
||||
var size int64 = 1000
|
||||
var size int64 = 2_000
|
||||
for {
|
||||
items, err := db.ReadItems(offset, size)
|
||||
var l = len(items)
|
||||
@@ -148,6 +151,11 @@ func (this *IPListManager) loop() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 第一次同步则打印信息
|
||||
if this.isFirstTime {
|
||||
remotelogs.Println("IP_LIST_MANAGER", "initializing ip items ...")
|
||||
}
|
||||
|
||||
for {
|
||||
hasNext, err := this.fetch()
|
||||
if err != nil {
|
||||
@@ -159,6 +167,12 @@ func (this *IPListManager) loop() error {
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
// 第一次同步则打印信息
|
||||
if this.isFirstTime {
|
||||
this.isFirstTime = false
|
||||
remotelogs.Println("IP_LIST_MANAGER", "finished initializing ip items")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -168,8 +182,8 @@ func (this *IPListManager) fetch() (hasNext bool, err error) {
|
||||
return false, err
|
||||
}
|
||||
itemsResp, err := rpcClient.IPItemRPC.ListIPItemsAfterVersion(rpcClient.Context(), &pb.ListIPItemsAfterVersionRequest{
|
||||
Version: this.version,
|
||||
Size: this.pageSize,
|
||||
Version: this.lastVersion,
|
||||
Size: this.fetchPageSize,
|
||||
})
|
||||
if err != nil {
|
||||
if rpc.IsConnError(err) {
|
||||
@@ -211,6 +225,7 @@ func (this *IPListManager) DeleteExpiredItems() {
|
||||
}
|
||||
}
|
||||
|
||||
// 处理IP条目
|
||||
func (this *IPListManager) processItems(items []*pb.IPItem, fromRemote bool) {
|
||||
var changedLists = map[*IPList]zero.Zero{}
|
||||
for _, item := range items {
|
||||
@@ -280,8 +295,8 @@ func (this *IPListManager) processItems(items []*pb.IPItem, fromRemote bool) {
|
||||
|
||||
if fromRemote {
|
||||
var latestVersion = items[len(items)-1].Version
|
||||
if latestVersion > this.version {
|
||||
this.version = latestVersion
|
||||
if latestVersion > this.lastVersion {
|
||||
this.lastVersion = latestVersion
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/stats"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/ttlcache"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils"
|
||||
connutils "github.com/TeaOSLab/EdgeNode/internal/utils/conns"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils/fasttime"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/waf"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
@@ -34,6 +35,7 @@ type ClientConn struct {
|
||||
hasRead bool
|
||||
|
||||
isLO bool // 是否为环路
|
||||
isNoStat bool // 是否不统计带宽
|
||||
isInAllowList bool
|
||||
|
||||
hasResetSYNFlood bool
|
||||
@@ -53,15 +55,15 @@ type ClientConn struct {
|
||||
func NewClientConn(rawConn net.Conn, isHTTP bool, isTLS bool, isInAllowList bool) net.Conn {
|
||||
// 是否为环路
|
||||
var remoteAddr = rawConn.RemoteAddr().String()
|
||||
var isLO = strings.HasPrefix(remoteAddr, "127.0.0.1:") || strings.HasPrefix(remoteAddr, "[::1]:")
|
||||
|
||||
var conn = &ClientConn{
|
||||
BaseClientConn: BaseClientConn{rawConn: rawConn},
|
||||
isTLS: isTLS,
|
||||
isHTTP: isHTTP,
|
||||
isLO: isLO,
|
||||
isLO: strings.HasPrefix(remoteAddr, "127.0.0.1:") || strings.HasPrefix(remoteAddr, "[::1]:"),
|
||||
isNoStat: connutils.IsNoStatConn(rawConn.RemoteAddr().String()),
|
||||
isInAllowList: isInAllowList,
|
||||
createdAt: time.Now().Unix(),
|
||||
createdAt: fasttime.Now().Unix(),
|
||||
}
|
||||
|
||||
var globalServerConfig = sharedNodeConfig.GlobalServerConfig
|
||||
@@ -85,7 +87,7 @@ func NewClientConn(rawConn net.Conn, isHTTP bool, isTLS bool, isInAllowList bool
|
||||
|
||||
func (this *ClientConn) Read(b []byte) (n int, err error) {
|
||||
if this.isDebugging {
|
||||
this.lastReadAt = time.Now().Unix()
|
||||
this.lastReadAt = fasttime.Now().Unix()
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
@@ -151,7 +153,7 @@ func (this *ClientConn) Write(b []byte) (n int, err error) {
|
||||
}
|
||||
|
||||
if this.isDebugging {
|
||||
this.lastWriteAt = time.Now().Unix()
|
||||
this.lastWriteAt = fasttime.Now().Unix()
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
@@ -184,7 +186,7 @@ func (this *ClientConn) Write(b []byte) (n int, err error) {
|
||||
// 统计当前服务带宽
|
||||
if this.serverId > 0 {
|
||||
// TODO 需要加入在serverId绑定之前的带宽
|
||||
if !this.isLO || Tea.IsTesting() { // 环路不统计带宽,避免缓存预热等行为产生带宽
|
||||
if !this.isNoStat || Tea.IsTesting() { // 环路不统计带宽,避免缓存预热等行为产生带宽
|
||||
atomic.AddUint64(&teaconst.OutTrafficBytes, uint64(n))
|
||||
|
||||
var cost = time.Since(before).Seconds()
|
||||
@@ -309,7 +311,7 @@ func (this *ClientConn) increaseSYNFlood(synFloodConfig *firewallconfigs.SYNFloo
|
||||
_ = this.SetLinger(0)
|
||||
_ = this.Close()
|
||||
|
||||
waf.SharedIPBlackList.RecordIP(waf.IPTypeAll, firewallconfigs.FirewallScopeGlobal, 0, ip, time.Now().Unix()+int64(timeout), 0, true, 0, 0, "疑似SYN Flood攻击,当前1分钟"+types.String(result)+"次空连接")
|
||||
waf.SharedIPBlackList.RecordIP(waf.IPTypeAll, firewallconfigs.FirewallScopeGlobal, 0, ip, fasttime.Now().Unix()+int64(timeout), 0, true, 0, 0, "疑似SYN Flood攻击,当前1分钟"+types.String(result)+"次空连接")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/goman"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/rpc"
|
||||
connutils "github.com/TeaOSLab/EdgeNode/internal/utils/conns"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
"io"
|
||||
"net"
|
||||
@@ -61,7 +62,12 @@ func NewHTTPCacheTaskManager() *HTTPCacheTaskManager {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return net.Dial(network, "127.0.0.1:"+port)
|
||||
conn, err := net.Dial(network, "127.0.0.1:"+port)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return connutils.NewNoStat(conn), nil
|
||||
},
|
||||
MaxIdleConns: 128,
|
||||
MaxIdleConnsPerHost: 32,
|
||||
|
||||
@@ -40,7 +40,7 @@ func (this *NodeStatusExecutor) updateMem(status *nodeconfigs.NodeStatus) {
|
||||
if minFreeMemory > 1<<30 {
|
||||
minFreeMemory = 1 << 30
|
||||
}
|
||||
if stat.Free < minFreeMemory {
|
||||
if stat.Available > 0 && stat.Available < minFreeMemory {
|
||||
runtime.GC()
|
||||
debug.FreeOSMemory()
|
||||
}
|
||||
|
||||
@@ -54,6 +54,9 @@ type HTTPRequestStatManager struct {
|
||||
totalAttackRequests int64
|
||||
|
||||
locker sync.Mutex
|
||||
|
||||
monitorTicker *time.Ticker
|
||||
uploadTicker *time.Ticker
|
||||
}
|
||||
|
||||
// NewHTTPRequestStatManager 获取新对象
|
||||
@@ -77,12 +80,12 @@ func NewHTTPRequestStatManager() *HTTPRequestStatManager {
|
||||
// Start 启动
|
||||
func (this *HTTPRequestStatManager) Start() {
|
||||
// 上传请求总数
|
||||
var monitorTicker = time.NewTicker(1 * time.Minute)
|
||||
this.monitorTicker = time.NewTicker(1 * time.Minute)
|
||||
events.OnKey(events.EventQuit, this, func() {
|
||||
monitorTicker.Stop()
|
||||
this.monitorTicker.Stop()
|
||||
})
|
||||
goman.New(func() {
|
||||
for range monitorTicker.C {
|
||||
for range this.monitorTicker.C {
|
||||
if this.totalAttackRequests > 0 {
|
||||
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemAttackRequests, maps.Map{"total": this.totalAttackRequests})
|
||||
this.totalAttackRequests = 0
|
||||
@@ -90,19 +93,19 @@ func (this *HTTPRequestStatManager) Start() {
|
||||
}
|
||||
})
|
||||
|
||||
var uploadTicker = time.NewTicker(30 * time.Minute)
|
||||
this.uploadTicker = time.NewTicker(30 * time.Minute)
|
||||
if Tea.IsTesting() {
|
||||
uploadTicker = time.NewTicker(10 * time.Second) // 在测试环境下缩短Ticker时间,以方便我们调试
|
||||
this.uploadTicker = time.NewTicker(10 * time.Second) // 在测试环境下缩短Ticker时间,以方便我们调试
|
||||
}
|
||||
remotelogs.Println("HTTP_REQUEST_STAT_MANAGER", "start ...")
|
||||
events.OnKey(events.EventQuit, this, func() {
|
||||
remotelogs.Println("HTTP_REQUEST_STAT_MANAGER", "quit")
|
||||
uploadTicker.Stop()
|
||||
this.uploadTicker.Stop()
|
||||
})
|
||||
|
||||
// 上传Ticker
|
||||
goman.New(func() {
|
||||
for range uploadTicker.C {
|
||||
for range this.uploadTicker.C {
|
||||
var tr = trackers.Begin("UPLOAD_REQUEST_STATS")
|
||||
err := this.Upload()
|
||||
tr.End()
|
||||
@@ -204,34 +207,38 @@ func (this *HTTPRequestStatManager) Loop() error {
|
||||
if len(pieces) < 4 {
|
||||
return nil
|
||||
}
|
||||
var serverId = pieces[0]
|
||||
var serverIdString = pieces[0]
|
||||
var ip = pieces[1]
|
||||
|
||||
var result = iplib.LookupIP(ip)
|
||||
if result != nil && result.IsOk() {
|
||||
var key = serverId + "@" + types.String(result.CountryId()) + "@" + types.String(result.ProvinceId()) + "@" + types.String(result.CityId())
|
||||
this.locker.Lock()
|
||||
stat, ok := this.cityMap[key]
|
||||
if !ok {
|
||||
// 检查数量
|
||||
if this.serverCityCountMap[key] > 128 { // 限制单个服务的城市数量,防止数量过多
|
||||
this.locker.Unlock()
|
||||
return nil
|
||||
}
|
||||
this.serverCityCountMap[key]++ // 需要放在限制之后,因为使用的是int16
|
||||
if result.CountryId() > 0 {
|
||||
var key = serverIdString + "@" + types.String(result.CountryId()) + "@" + types.String(result.ProvinceId()) + "@" + types.String(result.CityId())
|
||||
stat, ok := this.cityMap[key]
|
||||
if !ok {
|
||||
// 检查数量
|
||||
if this.serverCityCountMap[serverIdString] > 128 { // 限制单个服务的城市数量,防止数量过多
|
||||
this.locker.Unlock()
|
||||
return nil
|
||||
}
|
||||
this.serverCityCountMap[serverIdString]++ // 需要放在限制之后,因为使用的是int16
|
||||
|
||||
stat = &StatItem{}
|
||||
this.cityMap[key] = stat
|
||||
}
|
||||
stat.Bytes += types.Int64(pieces[2])
|
||||
stat.CountRequests++
|
||||
if types.Int8(pieces[3]) == 1 {
|
||||
stat.AttackBytes += types.Int64(pieces[2])
|
||||
stat.CountAttackRequests++
|
||||
stat = &StatItem{}
|
||||
this.cityMap[key] = stat
|
||||
}
|
||||
stat.Bytes += types.Int64(pieces[2])
|
||||
stat.CountRequests++
|
||||
if types.Int8(pieces[3]) == 1 {
|
||||
stat.AttackBytes += types.Int64(pieces[2])
|
||||
stat.CountAttackRequests++
|
||||
}
|
||||
}
|
||||
|
||||
if result.ProviderId() > 0 {
|
||||
this.providerMap[serverId+"@"+types.String(result.ProviderId())]++
|
||||
this.providerMap[serverIdString+"@"+types.String(result.ProviderId())]++
|
||||
} else if utils.IsLocalIP(ip) { // 局域网IP
|
||||
this.providerMap[serverIdString+"@258"]++
|
||||
}
|
||||
this.locker.Unlock()
|
||||
}
|
||||
@@ -240,7 +247,7 @@ func (this *HTTPRequestStatManager) Loop() error {
|
||||
if atIndex < 0 {
|
||||
return nil
|
||||
}
|
||||
var serverId = userAgentString[:atIndex]
|
||||
var serverIdString = userAgentString[:atIndex]
|
||||
var userAgent = userAgentString[atIndex+1:]
|
||||
|
||||
var result = SharedUserAgentParser.Parse(userAgent)
|
||||
@@ -252,11 +259,11 @@ func (this *HTTPRequestStatManager) Loop() error {
|
||||
}
|
||||
this.locker.Lock()
|
||||
|
||||
var systemKey = serverId + "@" + osInfo.Name + "@" + osInfo.Version
|
||||
var systemKey = serverIdString + "@" + osInfo.Name + "@" + osInfo.Version
|
||||
_, ok := this.systemMap[systemKey]
|
||||
if !ok {
|
||||
if this.serverSystemCountMap[serverId] < 128 { // 限制最大数据,防止攻击
|
||||
this.serverSystemCountMap[serverId]++
|
||||
if this.serverSystemCountMap[serverIdString] < 128 { // 限制最大数据,防止攻击
|
||||
this.serverSystemCountMap[serverIdString]++
|
||||
ok = true
|
||||
}
|
||||
}
|
||||
@@ -274,11 +281,11 @@ func (this *HTTPRequestStatManager) Loop() error {
|
||||
}
|
||||
this.locker.Lock()
|
||||
|
||||
var browserKey = serverId + "@" + browser + "@" + browserVersion
|
||||
var browserKey = serverIdString + "@" + browser + "@" + browserVersion
|
||||
_, ok := this.browserMap[browserKey]
|
||||
if !ok {
|
||||
if this.serverBrowserCountMap[serverId] < 256 { // 限制最大数据,防止攻击
|
||||
this.serverBrowserCountMap[serverId]++
|
||||
if this.serverBrowserCountMap[serverIdString] < 256 { // 限制最大数据,防止攻击
|
||||
this.serverBrowserCountMap[serverIdString]++
|
||||
ok = true
|
||||
}
|
||||
}
|
||||
@@ -374,7 +381,7 @@ func (this *HTTPRequestStatManager) Upload() error {
|
||||
sort.Slice(pbCities, func(i, j int) bool {
|
||||
return pbCities[i].CountRequests > pbCities[j].CountRequests
|
||||
})
|
||||
var serverCountMap = map[int64]int16{}
|
||||
var serverCountMap = map[int64]int16{} // serverId => count
|
||||
for _, city := range pbCities {
|
||||
serverCountMap[city.ServerId]++
|
||||
if serverCountMap[city.ServerId] > maxCities {
|
||||
|
||||
72
internal/utils/conns/conn_no_stat.go
Normal file
72
internal/utils/conns/conn_no_stat.go
Normal file
@@ -0,0 +1,72 @@
|
||||
// Copyright 2023 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
|
||||
|
||||
package connutils
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/zero"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// 记录不需要带宽统计的连接
|
||||
// 比如本地的清理和预热
|
||||
var noStatAddrMap = map[string]zero.Zero{} // addr => Zero
|
||||
var noStatLocker = &sync.RWMutex{}
|
||||
|
||||
// IsNoStatConn 检查是否为不统计连接
|
||||
func IsNoStatConn(addr string) bool {
|
||||
noStatLocker.RLock()
|
||||
_, ok := noStatAddrMap[addr]
|
||||
noStatLocker.RUnlock()
|
||||
return ok
|
||||
}
|
||||
|
||||
type NoStatConn struct {
|
||||
rawConn net.Conn
|
||||
}
|
||||
|
||||
func NewNoStat(rawConn net.Conn) net.Conn {
|
||||
noStatLocker.Lock()
|
||||
noStatAddrMap[rawConn.LocalAddr().String()] = zero.New()
|
||||
noStatLocker.Unlock()
|
||||
return &NoStatConn{rawConn: rawConn}
|
||||
}
|
||||
|
||||
func (this *NoStatConn) Read(b []byte) (n int, err error) {
|
||||
return this.rawConn.Read(b)
|
||||
}
|
||||
|
||||
func (this *NoStatConn) Write(b []byte) (n int, err error) {
|
||||
return this.rawConn.Write(b)
|
||||
}
|
||||
|
||||
func (this *NoStatConn) Close() error {
|
||||
err := this.rawConn.Close()
|
||||
|
||||
noStatLocker.Lock()
|
||||
delete(noStatAddrMap, this.rawConn.LocalAddr().String())
|
||||
noStatLocker.Unlock()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (this *NoStatConn) LocalAddr() net.Addr {
|
||||
return this.rawConn.LocalAddr()
|
||||
}
|
||||
|
||||
func (this *NoStatConn) RemoteAddr() net.Addr {
|
||||
return this.rawConn.RemoteAddr()
|
||||
}
|
||||
|
||||
func (this *NoStatConn) SetDeadline(t time.Time) error {
|
||||
return this.rawConn.SetDeadline(t)
|
||||
}
|
||||
|
||||
func (this *NoStatConn) SetReadDeadline(t time.Time) error {
|
||||
return this.rawConn.SetReadDeadline(t)
|
||||
}
|
||||
|
||||
func (this *NoStatConn) SetWriteDeadline(t time.Time) error {
|
||||
return this.rawConn.SetWriteDeadline(t)
|
||||
}
|
||||
Reference in New Issue
Block a user