Compare commits

...

8 Commits

Author SHA1 Message Date
刘祥超
2390a3ef61 版本号更改为1.2.6 2023-07-28 09:28:05 +08:00
刘祥超
ffda81715f 可以修改访问未绑定域名时的状态码 2023-07-27 11:21:35 +08:00
刘祥超
f991700031 未绑定域名页面提示、访问节点IP显示自定义内容支持变量 2023-07-27 10:50:18 +08:00
刘祥超
e1ba6a90ff 增加重载一组网站事件 2023-07-27 10:37:16 +08:00
刘祥超
354161037b 优化升级程序 2023-07-27 10:03:29 +08:00
刘祥超
00d28df3ee 优化本地数据库关闭时提示 2023-07-27 10:03:18 +08:00
刘祥超
23abed0949 改进缓存相关错误提示 2023-07-26 19:12:02 +08:00
刘祥超
2acf01dcb7 刷新/预热缓存任务可以并行处理 2023-07-26 18:45:17 +08:00
13 changed files with 217 additions and 43 deletions

View File

@@ -7,7 +7,7 @@ import "errors"
// 常用的几个错误
var (
ErrNotFound = errors.New("cache not found")
ErrFileIsWriting = errors.New("the file is writing")
ErrFileIsWriting = errors.New("the cache file is updating")
ErrInvalidRange = errors.New("invalid range")
ErrEntityTooLarge = errors.New("entity too large")
ErrWritingUnavailable = errors.New("writing unavailable")

View File

@@ -1,7 +1,7 @@
package teaconst
const (
Version = "1.2.5"
Version = "1.2.6"
ProductName = "Edge Node"
ProcessName = "edge-node"

View File

@@ -3,10 +3,11 @@ package events
type Event = string
const (
EventStart Event = "start" // start loading
EventLoaded Event = "loaded" // first load
EventQuit Event = "quit" // quit node gracefully
EventReload Event = "reload" // reload config
EventTerminated Event = "terminated" // process terminated
EventNFTablesReady Event = "nftablesReady" // nftables ready
EventStart Event = "start" // start loading
EventLoaded Event = "loaded" // first load
EventQuit Event = "quit" // quit node gracefully
EventReload Event = "reload" // reload config
EventTerminated Event = "terminated" // process terminated
EventNFTablesReady Event = "nftablesReady" // nftables ready
EventReloadSomeServers Event = "reloadSomeServers" // reload some servers
)

View File

@@ -24,6 +24,12 @@ func On(event Event, callback func()) {
OnKey(event, nil, callback)
}
func OnEvents(events []Event, callback func()) {
for _, event := range events {
On(event, callback)
}
}
func OnClose(callback func()) {
On(EventQuit, callback)
On(EventTerminated, callback)

View File

@@ -1,27 +1,28 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package goman
package goman_test
import (
"github.com/TeaOSLab/EdgeNode/internal/goman"
"testing"
"time"
)
func TestNew(t *testing.T) {
New(func() {
goman.New(func() {
t.Log("Hello")
t.Log(List())
t.Log(goman.List())
})
time.Sleep(1 * time.Second)
t.Log(List())
t.Log(goman.List())
time.Sleep(1 * time.Second)
}
func TestNewWithArgs(t *testing.T) {
NewWithArgs(func(args ...interface{}) {
goman.NewWithArgs(func(args ...interface{}) {
t.Log(args[0], args[1])
}, 1, 2)
time.Sleep(1 * time.Second)

View File

@@ -0,0 +1,52 @@
// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package goman
import (
"github.com/TeaOSLab/EdgeNode/internal/zero"
"runtime"
"sync"
)
type TaskGroup struct {
semi chan zero.Zero
wg *sync.WaitGroup
locker *sync.RWMutex
}
func NewTaskGroup() *TaskGroup {
var concurrent = runtime.NumCPU()
if concurrent <= 1 {
concurrent = 2
}
return &TaskGroup{
semi: make(chan zero.Zero, concurrent),
wg: &sync.WaitGroup{},
locker: &sync.RWMutex{},
}
}
func (this *TaskGroup) Run(f func()) {
this.wg.Add(1)
go func() {
defer this.wg.Done()
this.semi <- zero.Zero{}
f()
<-this.semi
}()
}
func (this *TaskGroup) Wait() {
this.wg.Wait()
}
func (this *TaskGroup) Lock() {
this.locker.Lock()
}
func (this *TaskGroup) Unlock() {
this.locker.Unlock()
}

View File

@@ -0,0 +1,30 @@
// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package goman_test
import (
"github.com/TeaOSLab/EdgeNode/internal/goman"
"runtime"
"testing"
)
func TestNewTaskGroup(t *testing.T) {
var group = goman.NewTaskGroup()
var m = map[int]bool{}
for i := 0; i < runtime.NumCPU()*2; i++ {
var index = i
group.Run(func() {
t.Log("task", index)
group.Lock()
_, ok := m[index]
if ok {
t.Error("duplicated:", index)
}
m[index] = true
group.Unlock()
})
}
group.Wait()
}

View File

@@ -19,6 +19,7 @@ import (
"io"
"net"
"net/http"
"os"
"regexp"
"strings"
"time"
@@ -131,21 +132,29 @@ func (this *HTTPCacheTaskManager) Loop() error {
var pbResults = []*pb.UpdateHTTPCacheTaskKeysStatusRequest_KeyResult{}
var taskGroup = goman.NewTaskGroup()
for _, key := range keys {
err = this.processKey(key)
var taskKey = key
taskGroup.Run(func() {
processErr := this.processKey(taskKey)
var pbResult = &pb.UpdateHTTPCacheTaskKeysStatusRequest_KeyResult{
Id: taskKey.Id,
NodeClusterId: taskKey.NodeClusterId,
Error: "",
}
var pbResult = &pb.UpdateHTTPCacheTaskKeysStatusRequest_KeyResult{
Id: key.Id,
NodeClusterId: key.NodeClusterId,
Error: "",
}
if processErr != nil {
pbResult.Error = processErr.Error()
}
if err != nil {
pbResult.Error = err.Error()
}
pbResults = append(pbResults, pbResult)
taskGroup.Lock()
pbResults = append(pbResults, pbResult)
taskGroup.Unlock()
})
}
taskGroup.Wait()
_, err = rpcClient.HTTPCacheTaskKeyRPC.UpdateHTTPCacheTaskKeysStatus(rpcClient.Context(), &pb.UpdateHTTPCacheTaskKeysStatusRequest{KeyResults: pbResults})
if err != nil {
return err
@@ -242,6 +251,7 @@ func (this *HTTPCacheTaskManager) fetchKey(key *pb.HTTPCacheTaskKey) error {
req.Header.Set("Accept-Encoding", "gzip, deflate, br")
resp, err := this.httpClient.Do(req)
if err != nil {
err = this.simplifyErr(err)
return errors.New("request failed: " + fullKey + ": " + err.Error())
}
@@ -249,13 +259,32 @@ func (this *HTTPCacheTaskManager) fetchKey(key *pb.HTTPCacheTaskKey) error {
_ = resp.Body.Close()
}()
// 读取内容,以便于生成缓存
_, _ = io.Copy(io.Discard, resp.Body)
// 处理502
if resp.StatusCode == http.StatusBadGateway {
return errors.New("read origin site timeout")
}
// 读取内容,以便于生成缓存
_, err = io.Copy(io.Discard, resp.Body)
if err != nil {
if err != io.EOF {
err = this.simplifyErr(err)
return errors.New("request failed: " + fullKey + ": " + err.Error())
} else {
err = nil
}
}
return nil
}
func (this *HTTPCacheTaskManager) simplifyErr(err error) error {
if err == nil {
return nil
}
if os.IsTimeout(err) {
return errors.New("timeout to read origin site")
}
return err
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs"
"github.com/TeaOSLab/EdgeNode/internal/ttlcache"
"github.com/TeaOSLab/EdgeNode/internal/waf"
"github.com/iwind/TeaGo/types"
"net"
"net/http"
"time"
@@ -35,9 +36,24 @@ func (this *HTTPRequest) doMismatch() {
// 根据配置进行相应的处理
var globalServerConfig = sharedNodeConfig.GlobalServerConfig
if globalServerConfig != nil && globalServerConfig.HTTPAll.MatchDomainStrictly {
var statusCode = 404
var httpAllConfig = globalServerConfig.HTTPAll
var mismatchAction = httpAllConfig.DomainMismatchAction
if mismatchAction != nil && mismatchAction.Options != nil {
var mismatchStatusCode = mismatchAction.Options.GetInt("statusCode")
if mismatchStatusCode > 0 && mismatchStatusCode >= 100 && mismatchStatusCode < 1000 {
statusCode = mismatchStatusCode
}
}
// 是否正在访问IP
if globalServerConfig.HTTPAll.NodeIPShowPage && net.ParseIP(this.ReqHost) != nil {
_, _ = this.writer.WriteString(globalServerConfig.HTTPAll.NodeIPPageHTML)
var contentHTML = this.Format(globalServerConfig.HTTPAll.NodeIPPageHTML)
this.writer.Header().Set("Content-Type", "text/html; charset=utf-8")
this.writer.Header().Set("Content-Length", types.String(len(contentHTML)))
this.writer.WriteHeader(statusCode)
_, _ = this.writer.WriteString(contentHTML)
return
}
@@ -55,13 +71,13 @@ func (this *HTTPRequest) doMismatch() {
}
// 处理当前连接
var httpAllConfig = globalServerConfig.HTTPAll
var mismatchAction = httpAllConfig.DomainMismatchAction
if mismatchAction != nil && mismatchAction.Code == "page" {
if mismatchAction.Options != nil {
var contentHTML = this.Format(mismatchAction.Options.GetString("contentHTML"))
this.writer.Header().Set("Content-Type", "text/html; charset=utf-8")
this.writer.WriteHeader(mismatchAction.Options.GetInt("statusCode"))
_, _ = this.writer.Write([]byte(mismatchAction.Options.GetString("contentHTML")))
this.writer.Header().Set("Content-Length", types.String(len(contentHTML)))
this.writer.WriteHeader(statusCode)
_, _ = this.writer.Write([]byte(contentHTML))
} else {
http.Error(this.writer, "404 page not found: '"+this.URL()+"'", http.StatusNotFound)
}

View File

@@ -1056,6 +1056,9 @@ func (this *Node) reloadServer() {
if err != nil {
remotelogs.Error("NODE", "apply server config error: "+err.Error())
}
// notify event
events.Notify(events.EventReloadSomeServers)
}
}

View File

@@ -29,6 +29,7 @@ var sharedUpgradeManager = NewUpgradeManager()
type UpgradeManager struct {
isInstalling bool
lastFile string
exe string
}
// NewUpgradeManager 获取新对象
@@ -38,6 +39,14 @@ func NewUpgradeManager() *UpgradeManager {
// Start 启动升级
func (this *UpgradeManager) Start() {
// 必须放在文件解压之前读取可执行文件路径,防止解析之后,当前的可执行文件路径发生改变
exe, err := os.Executable()
if err != nil {
remotelogs.Error("UPGRADE_MANAGER", "can not find current executable file name")
return
}
this.exe = exe
// 测试环境下不更新
if Tea.IsTesting() {
return
@@ -49,7 +58,7 @@ func (this *UpgradeManager) Start() {
this.isInstalling = true
remotelogs.Println("UPGRADE_MANAGER", "upgrading node ...")
err := this.install()
err = this.install()
if err != nil {
remotelogs.Error("UPGRADE_MANAGER", "download failed: "+err.Error())
@@ -104,7 +113,7 @@ func (this *UpgradeManager) install() error {
remotelogs.Println("UPGRADE_MANAGER", "downloading new node ...")
var path = dir + "/edge-node" + ".tmp"
var path = dir + "/edge-node.tmp"
fp, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0777)
if err != nil {
return err
@@ -238,11 +247,6 @@ func (this *UpgradeManager) restart() error {
if DaemonIsOn && DaemonPid == os.Getppid() {
utils.Exit() // TODO 试着更优雅重启
} else {
exe, err := os.Executable()
if err != nil {
return err
}
// quit
events.Notify(events.EventQuit)
@@ -250,10 +254,9 @@ func (this *UpgradeManager) restart() error {
events.Notify(events.EventTerminated)
// 启动
exe = filepath.Dir(exe) + "/" + teaconst.ProcessName
var exe = filepath.Dir(this.exe) + "/" + teaconst.ProcessName
var cmd = executils.NewCmd(exe, "start")
err = cmd.Start()
err := cmd.Start()
if err != nil {
return err
}

View File

@@ -7,10 +7,12 @@ import (
"database/sql"
"errors"
"fmt"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils/fileutils"
_ "github.com/mattn/go-sqlite3"
"net/url"
"strings"
"sync"
"time"
@@ -21,6 +23,7 @@ var errDBIsClosed = errors.New("the database is closed")
type DB struct {
locker *fileutils.Locker
rawDB *sql.DB
dsn string
statusLocker sync.Mutex
countUpdating int32
@@ -41,6 +44,10 @@ func OpenReader(dsn string) (*DB, error) {
}
func open(dsn string, lock bool) (*DB, error) {
if teaconst.IsQuiting {
return nil, errors.New("can not open database when process is quiting")
}
// locker
var locker *fileutils.Locker
if lock {
@@ -64,14 +71,15 @@ func open(dsn string, lock bool) (*DB, error) {
return nil, err
}
var db = NewDB(rawDB)
var db = NewDB(rawDB, dsn)
db.locker = locker
return db, nil
}
func NewDB(rawDB *sql.DB) *DB {
func NewDB(rawDB *sql.DB, dsn string) *DB {
var db = &DB{
rawDB: rawDB,
dsn: dsn,
}
events.OnKey(events.EventQuit, fmt.Sprintf("db_%p", db), func() {
@@ -193,6 +201,14 @@ func (this *DB) Close() error {
}
}()
// print log
if len(this.dsn) > 0 {
u, _ := url.Parse(this.dsn)
if u != nil && len(u.Path) > 0 {
remotelogs.Debug("DB", "close '"+u.Path)
}
}
return this.rawDB.Close()
}

View File

@@ -0,0 +1,17 @@
// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package dbs_test
import (
"net/url"
"testing"
)
func TestParseDSN(t *testing.T) {
var dsn = "file:/home/cache/p43/.indexes/db-3.db?cache=private&mode=ro&_journal_mode=WAL&_sync=OFF&_cache_size=88000"
u, err := url.Parse(dsn)
if err != nil {
t.Fatal(err)
}
t.Log(u.Path) // expect: :/home/cache/p43/.indexes/db-3.db
}