Compare commits
17 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4b1a9f9a45 | ||
|
|
7a86ecb44b | ||
|
|
89a113431a | ||
|
|
ce62d0769b | ||
|
|
a72dc2e011 | ||
|
|
91ca2d6b6b | ||
|
|
0de6fa5ce8 | ||
|
|
84e2628769 | ||
|
|
fc28798c9f | ||
|
|
24c21c5513 | ||
|
|
8445e811a5 | ||
|
|
d54621d500 | ||
|
|
ef045e90f2 | ||
|
|
5205136809 | ||
|
|
179a7760fa | ||
|
|
640e69524c | ||
|
|
3620ab3dc6 |
@@ -83,6 +83,20 @@ func main() {
|
||||
}
|
||||
}
|
||||
})
|
||||
app.On("debug", func() {
|
||||
var sock = gosock.NewTmpSock(teaconst.ProcessName)
|
||||
reply, err := sock.Send(&gosock.Command{Code: "debug"})
|
||||
if err != nil {
|
||||
fmt.Println("[ERROR]" + err.Error())
|
||||
} else {
|
||||
var isDebug = maps.NewMap(reply.Params).GetBool("debug")
|
||||
if isDebug {
|
||||
fmt.Println("debug on")
|
||||
} else {
|
||||
fmt.Println("debug off")
|
||||
}
|
||||
}
|
||||
})
|
||||
app.Run(func() {
|
||||
nodes.NewAPINode().Start()
|
||||
})
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
//go:build community
|
||||
// +build community
|
||||
//go:build !plus
|
||||
// +build !plus
|
||||
|
||||
package accesslogs
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
// +build community
|
||||
//go:build !plus
|
||||
// +build !plus
|
||||
|
||||
package teaconst
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package teaconst
|
||||
|
||||
const (
|
||||
Version = "0.4.0"
|
||||
Version = "0.4.1"
|
||||
|
||||
ProductName = "Edge API"
|
||||
ProcessName = "edge-api"
|
||||
@@ -18,8 +18,8 @@ const (
|
||||
|
||||
// 其他节点版本号,用来检测是否有需要升级的节点
|
||||
|
||||
NodeVersion = "0.4.0"
|
||||
UserNodeVersion = "0.3.0"
|
||||
NodeVersion = "0.4.1"
|
||||
UserNodeVersion = "0.3.1"
|
||||
AuthorityNodeVersion = "0.0.2"
|
||||
MonitorNodeVersion = "0.0.3"
|
||||
DNSNodeVersion = "0.2.1"
|
||||
|
||||
@@ -6,4 +6,5 @@ var (
|
||||
IsPlus = false
|
||||
MaxNodes int32 = 0
|
||||
NodeId int64 = 0
|
||||
Debug = false
|
||||
)
|
||||
|
||||
76
internal/db/models/api_method_stat_dao.go
Normal file
76
internal/db/models/api_method_stat_dao.go
Normal file
@@ -0,0 +1,76 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
teaconst "github.com/TeaOSLab/EdgeAPI/internal/const"
|
||||
_ "github.com/go-sql-driver/mysql"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
"github.com/iwind/TeaGo/dbs"
|
||||
timeutil "github.com/iwind/TeaGo/utils/time"
|
||||
)
|
||||
|
||||
type APIMethodStatDAO dbs.DAO
|
||||
|
||||
func NewAPIMethodStatDAO() *APIMethodStatDAO {
|
||||
return dbs.NewDAO(&APIMethodStatDAO{
|
||||
DAOObject: dbs.DAOObject{
|
||||
DB: Tea.Env,
|
||||
Table: "edgeAPIMethodStats",
|
||||
Model: new(APIMethodStat),
|
||||
PkName: "id",
|
||||
},
|
||||
}).(*APIMethodStatDAO)
|
||||
}
|
||||
|
||||
var SharedAPIMethodStatDAO *APIMethodStatDAO
|
||||
|
||||
func init() {
|
||||
dbs.OnReady(func() {
|
||||
SharedAPIMethodStatDAO = NewAPIMethodStatDAO()
|
||||
})
|
||||
}
|
||||
|
||||
// CreateStat 记录统计数据
|
||||
func (this *APIMethodStatDAO) CreateStat(tx *dbs.Tx, method string, tag string, costMs float64) error {
|
||||
var day = timeutil.Format("Ymd")
|
||||
return this.Query(tx).
|
||||
Param("costMs", costMs).
|
||||
InsertOrUpdateQuickly(map[string]interface{}{
|
||||
"apiNodeId": teaconst.NodeId,
|
||||
"method": method,
|
||||
"tag": tag,
|
||||
"costMs": costMs,
|
||||
"peekMs": costMs,
|
||||
"countCalls": 1,
|
||||
"day": day,
|
||||
}, map[string]interface{}{
|
||||
"costMs": dbs.SQL("(costMs*countCalls+:costMs)/(countCalls+1)"),
|
||||
"peekMs": dbs.SQL("IF(peekMs>:costMs, peekMs, :costMs)"),
|
||||
"countCalls": dbs.SQL("countCalls+1"),
|
||||
})
|
||||
}
|
||||
|
||||
// FindAllStatsWithDay 查询当前统计
|
||||
func (this *APIMethodStatDAO) FindAllStatsWithDay(tx *dbs.Tx, day string) (result []*APIMethodStat, err error) {
|
||||
_, err = this.Query(tx).
|
||||
Attr("day", day).
|
||||
Slice(&result).
|
||||
FindAll()
|
||||
return
|
||||
}
|
||||
|
||||
// CountAllStatsWithDay 统计当天数量
|
||||
func (this *APIMethodStatDAO) CountAllStatsWithDay(tx *dbs.Tx, day string) (int64, error) {
|
||||
return this.Query(tx).
|
||||
Attr("day", day).
|
||||
Count()
|
||||
}
|
||||
|
||||
// Clean 清理数据
|
||||
func (this *APIMethodStatDAO) Clean(tx *dbs.Tx) error {
|
||||
var day = timeutil.Format("Ymd")
|
||||
_, err := this.Query(tx).
|
||||
Param("day", day).
|
||||
Where("day<:day").
|
||||
Delete()
|
||||
return err
|
||||
}
|
||||
19
internal/db/models/api_method_stat_dao_test.go
Normal file
19
internal/db/models/api_method_stat_dao_test.go
Normal file
@@ -0,0 +1,19 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
_ "github.com/go-sql-driver/mysql"
|
||||
_ "github.com/iwind/TeaGo/bootstrap"
|
||||
"github.com/iwind/TeaGo/dbs"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestAPIMethodStatDAO_CreateStat(t *testing.T) {
|
||||
var dao = NewAPIMethodStatDAO()
|
||||
var tx *dbs.Tx
|
||||
|
||||
err := dao.CreateStat(tx, "/pb.Hello/World", "tag", 1.123)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log("ok")
|
||||
}
|
||||
28
internal/db/models/api_method_stat_model.go
Normal file
28
internal/db/models/api_method_stat_model.go
Normal file
@@ -0,0 +1,28 @@
|
||||
package models
|
||||
|
||||
// APIMethodStat API方法统计
|
||||
type APIMethodStat struct {
|
||||
Id uint64 `field:"id"` // ID
|
||||
ApiNodeId uint32 `field:"apiNodeId"` // API节点ID
|
||||
Method string `field:"method"` // 方法
|
||||
Tag string `field:"tag"` // 标签方法
|
||||
CostMs float64 `field:"costMs"` // 耗时Ms
|
||||
PeekMs float64 `field:"peekMs"` // 峰值耗时
|
||||
CountCalls uint64 `field:"countCalls"` // 调用次数
|
||||
Day string `field:"day"` // 日期
|
||||
}
|
||||
|
||||
type APIMethodStatOperator struct {
|
||||
Id interface{} // ID
|
||||
ApiNodeId interface{} // API节点ID
|
||||
Method interface{} // 方法
|
||||
Tag interface{} // 标签方法
|
||||
CostMs interface{} // 耗时Ms
|
||||
PeekMs interface{} // 峰值耗时
|
||||
CountCalls interface{} // 调用次数
|
||||
Day interface{} // 日期
|
||||
}
|
||||
|
||||
func NewAPIMethodStatOperator() *APIMethodStatOperator {
|
||||
return &APIMethodStatOperator{}
|
||||
}
|
||||
1
internal/db/models/api_method_stat_model_ext.go
Normal file
1
internal/db/models/api_method_stat_model_ext.go
Normal file
@@ -0,0 +1 @@
|
||||
package models
|
||||
@@ -62,7 +62,15 @@ func (this *APINodeDAO) DisableAPINode(tx *dbs.Tx, id int64) error {
|
||||
}
|
||||
|
||||
// FindEnabledAPINode 查找启用中的条目
|
||||
func (this *APINodeDAO) FindEnabledAPINode(tx *dbs.Tx, id int64) (*APINode, error) {
|
||||
func (this *APINodeDAO) FindEnabledAPINode(tx *dbs.Tx, id int64, cacheMap *utils.CacheMap) (*APINode, error) {
|
||||
var cacheKey = this.Table + ":FindEnabledAPINode:" + types.String(id)
|
||||
if cacheMap != nil {
|
||||
cache, ok := cacheMap.Get(cacheKey)
|
||||
if ok {
|
||||
return cache.(*APINode), nil
|
||||
}
|
||||
}
|
||||
|
||||
result, err := this.Query(tx).
|
||||
Pk(id).
|
||||
Attr("state", APINodeStateEnabled).
|
||||
@@ -70,6 +78,11 @@ func (this *APINodeDAO) FindEnabledAPINode(tx *dbs.Tx, id int64) (*APINode, erro
|
||||
if result == nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cacheMap != nil {
|
||||
cacheMap.Put(cacheKey, result)
|
||||
}
|
||||
|
||||
return result.(*APINode), err
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
//go:build community
|
||||
// +build community
|
||||
//go:build !plus
|
||||
// +build !plus
|
||||
|
||||
package authority
|
||||
|
||||
|
||||
@@ -53,7 +53,7 @@ func TestHTTPAccessLogDAO_ListAccessLogs(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, "", 10, timeutil.Format("Ymd"), 0, false, false, 0, 0, 0, false, 0, "", "", "")
|
||||
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, "", 10, timeutil.Format("Ymd"), 0, 0, 0, false, false, 0, 0, 0, false, 0, "", "", "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -80,7 +80,7 @@ func TestHTTPAccessLogDAO_ListAccessLogs_Page(t *testing.T) {
|
||||
times := 0 // 防止循环次数太多
|
||||
for {
|
||||
before := time.Now()
|
||||
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, lastRequestId, 2, timeutil.Format("Ymd"), 0, false, false, 0, 0, 0, false, 0, "", "", "")
|
||||
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, lastRequestId, 2, timeutil.Format("Ymd"), 0, 0, 0, false, false, 0, 0, 0, false, 0, "", "", "")
|
||||
cost := time.Since(before).Seconds()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -111,7 +111,7 @@ func TestHTTPAccessLogDAO_ListAccessLogs_Reverse(t *testing.T) {
|
||||
}
|
||||
|
||||
before := time.Now()
|
||||
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, "16023261176446590001000000000000003500000004", 2, timeutil.Format("Ymd"), 0, true, false, 0, 0, 0, false, 0, "", "", "")
|
||||
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, "16023261176446590001000000000000003500000004", 2, timeutil.Format("Ymd"), 0, 0, 0, true, false, 0, 0, 0, false, 0, "", "", "")
|
||||
cost := time.Since(before).Seconds()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -136,7 +136,7 @@ func TestHTTPAccessLogDAO_ListAccessLogs_Page_NotExists(t *testing.T) {
|
||||
times := 0 // 防止循环次数太多
|
||||
for {
|
||||
before := time.Now()
|
||||
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, lastRequestId, 2, timeutil.Format("Ymd", time.Now().AddDate(0, 0, 1)), 0, false, false, 0, 0, 0, false, 0, "", "", "")
|
||||
accessLogs, requestId, hasMore, err := SharedHTTPAccessLogDAO.ListAccessLogs(tx, lastRequestId, 2, timeutil.Format("Ymd", time.Now().AddDate(0, 0, 1)), 0, 0, 0, false, false, 0, 0, 0, false, 0, "", "", "")
|
||||
cost := time.Since(before).Seconds()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
||||
@@ -2,6 +2,8 @@ package models
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/errors"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/goman"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/utils"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs"
|
||||
@@ -19,6 +21,20 @@ const (
|
||||
IPItemStateDisabled = 0 // 已禁用
|
||||
)
|
||||
|
||||
func init() {
|
||||
dbs.OnReadyDone(func() {
|
||||
goman.New(func() {
|
||||
var ticker = time.NewTicker(1 * time.Minute)
|
||||
for range ticker.C {
|
||||
err := SharedIPItemDAO.CleanExpiredIPItems(nil)
|
||||
if err != nil {
|
||||
remotelogs.Error("IPItemDAO", "clean expired ip items failed: "+err.Error())
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
type IPItemType = string
|
||||
|
||||
const (
|
||||
@@ -291,35 +307,6 @@ func (this *IPItemDAO) ListIPItemsWithListId(tx *dbs.Tx, listId int64, keyword s
|
||||
|
||||
// ListIPItemsAfterVersion 根据版本号查找IP列表
|
||||
func (this *IPItemDAO) ListIPItemsAfterVersion(tx *dbs.Tx, version int64, size int64) (result []*IPItem, err error) {
|
||||
// 将过期的设置为已删除,这样是为了在 expiredAt<UNIX_TIMESTAMP()边缘节点让过期的IP有一个执行删除的机会
|
||||
ones, _, err := this.Query(tx).
|
||||
ResultPk().
|
||||
Where("(expiredAt>0 AND expiredAt<=:timestamp)").
|
||||
Param("timestamp", time.Now().Unix()).
|
||||
State(IPItemStateEnabled).
|
||||
Limit(100).
|
||||
FindOnes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, one := range ones {
|
||||
var expiredId = one.GetInt64("id")
|
||||
newVersion, err := SharedIPListDAO.IncreaseVersion(tx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, err = this.Query(tx).
|
||||
Pk(expiredId).
|
||||
Set("state", IPItemStateDisabled).
|
||||
Set("expiredAt", 0).
|
||||
Set("version", newVersion).
|
||||
Update()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
_, err = this.Query(tx).
|
||||
// 这里不要设置状态参数,因为我们要知道哪些是删除的
|
||||
Gt("version", version).
|
||||
@@ -438,6 +425,51 @@ func (this *IPItemDAO) UpdateItemsRead(tx *dbs.Tx) error {
|
||||
UpdateQuickly()
|
||||
}
|
||||
|
||||
// CleanExpiredIPItems 清除过期数据
|
||||
func (this *IPItemDAO) CleanExpiredIPItems(tx *dbs.Tx) error {
|
||||
// 删除 N 天之前过期的数据
|
||||
_, err := this.Query(tx).
|
||||
Where("expiredAt<=:timestamp").
|
||||
State(IPItemStateDisabled).
|
||||
Param("timestamp", time.Now().Unix()-7*86400). // N 天之前过期的
|
||||
Limit(10000). // 限制条数,防止数量过多导致超时
|
||||
Delete()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 将过期的设置为已删除,这样是为了在 expiredAt<UNIX_TIMESTAMP()边缘节点让过期的IP有一个执行删除的机会
|
||||
ones, _, err := this.Query(tx).
|
||||
ResultPk().
|
||||
Where("(expiredAt>0 AND expiredAt<=:timestamp)").
|
||||
Param("timestamp", time.Now().Unix()).
|
||||
State(IPItemStateEnabled).
|
||||
Limit(500).
|
||||
FindOnes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, one := range ones {
|
||||
var expiredId = one.GetInt64("id")
|
||||
newVersion, err := SharedIPListDAO.IncreaseVersion(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// 这里不重置过期时间用于清理
|
||||
_, err = this.Query(tx).
|
||||
Pk(expiredId).
|
||||
Set("state", IPItemStateDisabled).
|
||||
Set("version", newVersion).
|
||||
Update()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// NotifyUpdate 通知更新
|
||||
func (this *IPItemDAO) NotifyUpdate(tx *dbs.Tx, itemId int64) error {
|
||||
// 获取ListId
|
||||
@@ -464,7 +496,7 @@ func (this *IPItemDAO) NotifyUpdate(tx *dbs.Tx, itemId int64) error {
|
||||
return err
|
||||
}
|
||||
for _, clusterId := range clusterIds {
|
||||
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeIPItemChanged)
|
||||
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, 0, NodeTaskTypeIPItemChanged)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -472,7 +504,7 @@ func (this *IPItemDAO) NotifyUpdate(tx *dbs.Tx, itemId int64) error {
|
||||
} else {
|
||||
clusterIds, err := SharedNodeClusterDAO.FindAllEnabledNodeClusterIds(tx)
|
||||
for _, clusterId := range clusterIds {
|
||||
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeIPItemChanged)
|
||||
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, 0, NodeTaskTypeIPItemChanged)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -524,7 +556,7 @@ func (this *IPItemDAO) NotifyUpdate(tx *dbs.Tx, itemId int64) error {
|
||||
|
||||
if len(resultClusterIds) > 0 {
|
||||
for _, clusterId := range resultClusterIds {
|
||||
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeIPItemChanged)
|
||||
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, 0, NodeTaskTypeIPItemChanged)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs"
|
||||
_ "github.com/go-sql-driver/mysql"
|
||||
"github.com/iwind/TeaGo/dbs"
|
||||
"github.com/iwind/TeaGo/rands"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestIPItemDAO_NotifyClustersUpdate(t *testing.T) {
|
||||
@@ -27,3 +31,34 @@ func TestIPItemDAO_DisableIPItemsWithListId(t *testing.T) {
|
||||
}
|
||||
t.Log("ok")
|
||||
}
|
||||
|
||||
func TestIPItemDAO_ListIPItemsAfterVersion(t *testing.T) {
|
||||
dbs.NotifyReady()
|
||||
|
||||
var tx *dbs.Tx
|
||||
_, err := SharedIPItemDAO.ListIPItemsAfterVersion(tx, 0, 100)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log("ok")
|
||||
}
|
||||
|
||||
func TestIPItemDAO_CreateManyIPs(t *testing.T) {
|
||||
dbs.NotifyReady()
|
||||
|
||||
var tx *dbs.Tx
|
||||
var dao = NewIPItemDAO()
|
||||
var n = 10
|
||||
for i := 0; i < n; i++ {
|
||||
itemId, err := dao.CreateIPItem(tx, firewallconfigs.GlobalListId, "192."+types.String(rands.Int(0, 255))+"."+types.String(rands.Int(0, 255))+"."+types.String(rands.Int(0, 255)), "", time.Now().Unix()+86400, "test", IPItemTypeIPv4, "warning", 0, 0, 0, 0, 0, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_ = itemId
|
||||
/**err = dao.Query(tx).Pk(itemId).Set("state", 0).UpdateQuickly()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}**/
|
||||
}
|
||||
t.Log("ok")
|
||||
}
|
||||
|
||||
@@ -284,7 +284,7 @@ func (this *IPListDAO) NotifyUpdate(tx *dbs.Tx, listId int64, taskType NodeTaskT
|
||||
|
||||
if len(resultClusterIds) > 0 {
|
||||
for _, clusterId := range resultClusterIds {
|
||||
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, taskType)
|
||||
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, 0, taskType)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -334,7 +334,7 @@ func (this *MetricItemDAO) NotifyUpdate(tx *dbs.Tx, itemId int64, isPublic bool)
|
||||
return err
|
||||
}
|
||||
for _, clusterId := range clusterIds {
|
||||
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeConfigChanged)
|
||||
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, 0, NodeTaskTypeConfigChanged)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -346,7 +346,7 @@ func (this *MetricItemDAO) NotifyUpdate(tx *dbs.Tx, itemId int64, isPublic bool)
|
||||
return err
|
||||
}
|
||||
for _, clusterId := range clusterIds {
|
||||
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeConfigChanged)
|
||||
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, 0, NodeTaskTypeConfigChanged)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -282,7 +282,7 @@ func (this *NSDomainDAO) NotifyUpdate(tx *dbs.Tx, domainId int64) error {
|
||||
return err
|
||||
}
|
||||
if clusterId > 0 {
|
||||
return models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, models.NSNodeTaskTypeDomainChanged)
|
||||
return models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, 0, models.NSNodeTaskTypeDomainChanged)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -199,7 +199,7 @@ func (this *NSKeyDAO) NotifyUpdate(tx *dbs.Tx, keyId int64) error {
|
||||
return err
|
||||
}
|
||||
if clusterId > 0 {
|
||||
err = models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, models.NSNodeTaskTypeKeyChanged)
|
||||
err = models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, 0, models.NSNodeTaskTypeKeyChanged)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -279,7 +279,7 @@ func (this *NSRecordDAO) NotifyUpdate(tx *dbs.Tx, recordId int64) error {
|
||||
}
|
||||
|
||||
if clusterId > 0 {
|
||||
err = models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, models.NSNodeTaskTypeRecordChanged)
|
||||
err = models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, 0, models.NSNodeTaskTypeRecordChanged)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -259,7 +259,7 @@ func (this *NSRouteDAO) NotifyUpdate(tx *dbs.Tx) error {
|
||||
return err
|
||||
}
|
||||
for _, clusterId := range clusterIds {
|
||||
err = models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, models.NSNodeTaskTypeRouteChanged)
|
||||
err = models.SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, 0, models.NSNodeTaskTypeRouteChanged)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -275,7 +275,7 @@ func (this *NodeClusterDAO) FindAllAPINodeAddrsWithCluster(tx *dbs.Tx, clusterId
|
||||
return nil, err
|
||||
}
|
||||
for _, apiNodeId := range apiNodeIds {
|
||||
apiNode, err := SharedAPINodeDAO.FindEnabledAPINode(tx, apiNodeId)
|
||||
apiNode, err := SharedAPINodeDAO.FindEnabledAPINode(tx, apiNodeId, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -901,7 +901,7 @@ func (this *NodeClusterDAO) FindClusterBasicInfo(tx *dbs.Tx, clusterId int64, ca
|
||||
|
||||
// NotifyUpdate 通知更新
|
||||
func (this *NodeClusterDAO) NotifyUpdate(tx *dbs.Tx, clusterId int64) error {
|
||||
return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeConfigChanged)
|
||||
return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, 0, NodeTaskTypeConfigChanged)
|
||||
}
|
||||
|
||||
// NotifyDNSUpdate 通知DNS更新
|
||||
|
||||
@@ -177,5 +177,5 @@ func (this *NodeClusterMetricItemDAO) ExistsClusterItem(tx *dbs.Tx, clusterId in
|
||||
|
||||
// NotifyUpdate 通知更新
|
||||
func (this *NodeClusterMetricItemDAO) NotifyUpdate(tx *dbs.Tx, clusterId int64) error {
|
||||
return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeConfigChanged)
|
||||
return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, 0, NodeTaskTypeConfigChanged)
|
||||
}
|
||||
|
||||
@@ -518,11 +518,11 @@ func (this *NodeDAO) FindAllEnabledNodeIdsWithClusterId(tx *dbs.Tx, clusterId in
|
||||
func (this *NodeDAO) FindAllInactiveNodesWithClusterId(tx *dbs.Tx, clusterId int64) (result []*Node, err error) {
|
||||
_, err = this.Query(tx).
|
||||
State(NodeStateEnabled).
|
||||
Result("id", "name", "status").
|
||||
Attr("clusterId", clusterId).
|
||||
Attr("isOn", true). // 只监控启用的节点
|
||||
Attr("isInstalled", true). // 只监控已经安装的节点
|
||||
Attr("isActive", true). // 当前已经在线的
|
||||
Where("(status IS NULL OR (JSON_EXTRACT(status, '$.isActive')=false AND UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>10) OR UNIX_TIMESTAMP()-JSON_EXTRACT(status, '$.updatedAt')>120)").
|
||||
Attr("isActive", false).
|
||||
Result("id", "name").
|
||||
Slice(&result).
|
||||
FindAll()
|
||||
@@ -1471,7 +1471,7 @@ func (this *NodeDAO) NotifyUpdate(tx *dbs.Tx, nodeId int64) error {
|
||||
return err
|
||||
}
|
||||
if clusterId > 0 {
|
||||
return SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, NodeTaskTypeConfigChanged, 0)
|
||||
return SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, 0, NodeTaskTypeConfigChanged, 0)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
//go:build community
|
||||
// +build community
|
||||
//go:build !plus
|
||||
// +build !plus
|
||||
|
||||
package models
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package models
|
||||
|
||||
// 区域计费设置
|
||||
// NodePriceItem 区域计费设置
|
||||
type NodePriceItem struct {
|
||||
Id uint32 `field:"id"` // ID
|
||||
IsOn uint8 `field:"isOn"` // 是否启用
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package models
|
||||
|
||||
// 节点区域
|
||||
// NodeRegion 节点区域
|
||||
type NodeRegion struct {
|
||||
Id uint32 `field:"id"` // ID
|
||||
AdminId uint32 `field:"adminId"` // 管理员ID
|
||||
|
||||
@@ -1 +1,18 @@
|
||||
package models
|
||||
|
||||
import "encoding/json"
|
||||
|
||||
func (this *NodeRegion) DecodePriceMap() map[int64]float64 {
|
||||
var m = map[int64]float64{}
|
||||
if len(this.Prices) == 0 {
|
||||
return m
|
||||
}
|
||||
|
||||
err := json.Unmarshal([]byte(this.Prices), &m)
|
||||
if err != nil {
|
||||
// 忽略错误
|
||||
return m
|
||||
}
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
@@ -49,7 +49,7 @@ func init() {
|
||||
}
|
||||
|
||||
// CreateNodeTask 创建单个节点任务
|
||||
func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, role string, clusterId int64, nodeId int64, taskType NodeTaskType, version int64) error {
|
||||
func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, role string, clusterId int64, nodeId int64, serverId int64, taskType NodeTaskType, version int64) error {
|
||||
if clusterId <= 0 || nodeId <= 0 {
|
||||
return nil
|
||||
}
|
||||
@@ -60,6 +60,7 @@ func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, role string, clusterId int64
|
||||
"role": role,
|
||||
"clusterId": clusterId,
|
||||
"nodeId": nodeId,
|
||||
"serverId": serverId,
|
||||
"type": taskType,
|
||||
"uniqueId": uniqueId,
|
||||
"updatedAt": updatedAt,
|
||||
@@ -80,17 +81,18 @@ func (this *NodeTaskDAO) CreateNodeTask(tx *dbs.Tx, role string, clusterId int64
|
||||
}
|
||||
|
||||
// CreateClusterTask 创建集群任务
|
||||
func (this *NodeTaskDAO) CreateClusterTask(tx *dbs.Tx, role string, clusterId int64, taskType NodeTaskType) error {
|
||||
func (this *NodeTaskDAO) CreateClusterTask(tx *dbs.Tx, role string, clusterId int64, serverId int64, taskType NodeTaskType) error {
|
||||
if clusterId <= 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
uniqueId := role + "@" + types.String(clusterId) + "@cluster@" + taskType
|
||||
uniqueId := role + "@" + types.String(clusterId) + "@" + types.String(serverId) + "@cluster@" + taskType
|
||||
updatedAt := time.Now().Unix()
|
||||
_, _, err := this.Query(tx).
|
||||
InsertOrUpdate(maps.Map{
|
||||
"role": role,
|
||||
"clusterId": clusterId,
|
||||
"serverId": serverId,
|
||||
"nodeId": 0,
|
||||
"type": taskType,
|
||||
"uniqueId": uniqueId,
|
||||
@@ -112,7 +114,7 @@ func (this *NodeTaskDAO) CreateClusterTask(tx *dbs.Tx, role string, clusterId in
|
||||
}
|
||||
|
||||
// ExtractNodeClusterTask 分解边缘节点集群任务
|
||||
func (this *NodeTaskDAO) ExtractNodeClusterTask(tx *dbs.Tx, clusterId int64, taskType NodeTaskType) error {
|
||||
func (this *NodeTaskDAO) ExtractNodeClusterTask(tx *dbs.Tx, clusterId int64, serverId int64, taskType NodeTaskType) error {
|
||||
nodeIds, err := SharedNodeDAO.FindAllNodeIdsMatch(tx, clusterId, true, configutils.BoolStateYes)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -131,7 +133,7 @@ func (this *NodeTaskDAO) ExtractNodeClusterTask(tx *dbs.Tx, clusterId int64, tas
|
||||
|
||||
var version = time.Now().UnixNano()
|
||||
for _, nodeId := range nodeIds {
|
||||
err = this.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, taskType, version)
|
||||
err = this.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, clusterId, nodeId, serverId, taskType, version)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -170,7 +172,7 @@ func (this *NodeTaskDAO) ExtractNSClusterTask(tx *dbs.Tx, clusterId int64, taskT
|
||||
|
||||
var version = time.Now().UnixNano()
|
||||
for _, nodeId := range nodeIds {
|
||||
err = this.CreateNodeTask(tx, nodeconfigs.NodeRoleDNS, clusterId, nodeId, taskType, version)
|
||||
err = this.CreateNodeTask(tx, nodeconfigs.NodeRoleDNS, clusterId, nodeId, 0, taskType, version)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -202,7 +204,8 @@ func (this *NodeTaskDAO) ExtractAllClusterTasks(tx *dbs.Tx, role string) error {
|
||||
clusterId := int64(one.(*NodeTask).ClusterId)
|
||||
switch role {
|
||||
case nodeconfigs.NodeRoleNode:
|
||||
err = this.ExtractNodeClusterTask(tx, clusterId, one.(*NodeTask).Type)
|
||||
var nodeTask = one.(*NodeTask)
|
||||
err = this.ExtractNodeClusterTask(tx, clusterId, int64(nodeTask.ServerId), nodeTask.Type)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@ func TestNodeTaskDAO_CreateNodeTask(t *testing.T) {
|
||||
dbs.NotifyReady()
|
||||
|
||||
var tx *dbs.Tx
|
||||
err := SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, 1, 2, NodeTaskTypeConfigChanged, 0)
|
||||
err := SharedNodeTaskDAO.CreateNodeTask(tx, nodeconfigs.NodeRoleNode, 1, 2, 0, NodeTaskTypeConfigChanged, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -22,7 +22,7 @@ func TestNodeTaskDAO_CreateClusterTask(t *testing.T) {
|
||||
dbs.NotifyReady()
|
||||
|
||||
var tx *dbs.Tx
|
||||
err := SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, 1, NodeTaskTypeConfigChanged)
|
||||
err := SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, 1, 0, NodeTaskTypeConfigChanged)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -33,7 +33,7 @@ func TestNodeTaskDAO_ExtractClusterTask(t *testing.T) {
|
||||
dbs.NotifyReady()
|
||||
|
||||
var tx *dbs.Tx
|
||||
err := SharedNodeTaskDAO.ExtractNodeClusterTask(tx, 1, NodeTaskTypeConfigChanged)
|
||||
err := SharedNodeTaskDAO.ExtractNodeClusterTask(tx, 1, 0, NodeTaskTypeConfigChanged)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ type NodeTask struct {
|
||||
Role string `field:"role"` // 节点角色
|
||||
NodeId uint32 `field:"nodeId"` // 节点ID
|
||||
ClusterId uint32 `field:"clusterId"` // 集群ID
|
||||
ServerId uint32 `field:"serverId"` // 服务ID
|
||||
Type string `field:"type"` // 任务类型
|
||||
UniqueId string `field:"uniqueId"` // 唯一ID:nodeId@type
|
||||
UpdatedAt uint64 `field:"updatedAt"` // 修改时间
|
||||
@@ -21,6 +22,7 @@ type NodeTaskOperator struct {
|
||||
Role interface{} // 节点角色
|
||||
NodeId interface{} // 节点ID
|
||||
ClusterId interface{} // 集群ID
|
||||
ServerId interface{} // 服务ID
|
||||
Type interface{} // 任务类型
|
||||
UniqueId interface{} // 唯一ID:nodeId@type
|
||||
UpdatedAt interface{} // 修改时间
|
||||
|
||||
@@ -195,5 +195,5 @@ func (this *NSClusterDAO) FindClusterRecursion(tx *dbs.Tx, clusterId int64) ([]b
|
||||
|
||||
// NotifyUpdate 通知更改
|
||||
func (this *NSClusterDAO) NotifyUpdate(tx *dbs.Tx, clusterId int64) error {
|
||||
return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, NSNodeTaskTypeConfigChanged)
|
||||
return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleDNS, clusterId, 0, NSNodeTaskTypeConfigChanged)
|
||||
}
|
||||
|
||||
@@ -80,7 +80,17 @@ func (this *PlanDAO) FindPlanName(tx *dbs.Tx, id int64) (string, error) {
|
||||
}
|
||||
|
||||
// CreatePlan 创建套餐
|
||||
func (this *PlanDAO) CreatePlan(tx *dbs.Tx, name string, clusterId int64, trafficLimitJSON []byte, featuresJSON []byte, priceType serverconfigs.PlanPriceType, trafficPriceJSON []byte, monthlyPrice float32, seasonallyPrice float32, yearlyPrice float32) (int64, error) {
|
||||
func (this *PlanDAO) CreatePlan(tx *dbs.Tx,
|
||||
name string,
|
||||
clusterId int64,
|
||||
trafficLimitJSON []byte,
|
||||
featuresJSON []byte,
|
||||
priceType serverconfigs.PlanPriceType,
|
||||
trafficPriceJSON []byte,
|
||||
bandwidthPriceJSON []byte,
|
||||
monthlyPrice float32,
|
||||
seasonallyPrice float32,
|
||||
yearlyPrice float32) (int64, error) {
|
||||
var op = NewPlanOperator()
|
||||
op.Name = name
|
||||
op.ClusterId = clusterId
|
||||
@@ -94,6 +104,9 @@ func (this *PlanDAO) CreatePlan(tx *dbs.Tx, name string, clusterId int64, traffi
|
||||
if len(trafficPriceJSON) > 0 {
|
||||
op.TrafficPrice = trafficPriceJSON
|
||||
}
|
||||
if len(bandwidthPriceJSON) > 0 {
|
||||
op.BandwidthPrice = bandwidthPriceJSON
|
||||
}
|
||||
if monthlyPrice >= 0 {
|
||||
op.MonthlyPrice = monthlyPrice
|
||||
}
|
||||
@@ -109,7 +122,19 @@ func (this *PlanDAO) CreatePlan(tx *dbs.Tx, name string, clusterId int64, traffi
|
||||
}
|
||||
|
||||
// UpdatePlan 修改套餐
|
||||
func (this *PlanDAO) UpdatePlan(tx *dbs.Tx, planId int64, name string, isOn bool, clusterId int64, trafficLimitJSON []byte, featuresJSON []byte, priceType serverconfigs.PlanPriceType, trafficPriceJSON []byte, monthlyPrice float32, seasonallyPrice float32, yearlyPrice float32) error {
|
||||
func (this *PlanDAO) UpdatePlan(tx *dbs.Tx,
|
||||
planId int64,
|
||||
name string,
|
||||
isOn bool,
|
||||
clusterId int64,
|
||||
trafficLimitJSON []byte,
|
||||
featuresJSON []byte,
|
||||
priceType serverconfigs.PlanPriceType,
|
||||
trafficPriceJSON []byte,
|
||||
bandwidthPriceJSON []byte,
|
||||
monthlyPrice float32,
|
||||
seasonallyPrice float32,
|
||||
yearlyPrice float32) error {
|
||||
if planId <= 0 {
|
||||
return errors.New("invalid planId")
|
||||
}
|
||||
@@ -138,6 +163,9 @@ func (this *PlanDAO) UpdatePlan(tx *dbs.Tx, planId int64, name string, isOn bool
|
||||
if len(trafficPriceJSON) > 0 {
|
||||
op.TrafficPrice = trafficPriceJSON
|
||||
}
|
||||
if len(bandwidthPriceJSON) > 0 {
|
||||
op.BandwidthPrice = bandwidthPriceJSON
|
||||
}
|
||||
if monthlyPrice >= 0 {
|
||||
op.MonthlyPrice = monthlyPrice
|
||||
} else {
|
||||
|
||||
@@ -9,6 +9,7 @@ type Plan struct {
|
||||
TrafficLimit string `field:"trafficLimit"` // 流量限制
|
||||
Features string `field:"features"` // 允许的功能
|
||||
TrafficPrice string `field:"trafficPrice"` // 流量价格设定
|
||||
BandwidthPrice string `field:"bandwidthPrice"` // 带宽价格
|
||||
MonthlyPrice float64 `field:"monthlyPrice"` // 月付
|
||||
SeasonallyPrice float64 `field:"seasonallyPrice"` // 季付
|
||||
YearlyPrice float64 `field:"yearlyPrice"` // 年付
|
||||
@@ -25,6 +26,7 @@ type PlanOperator struct {
|
||||
TrafficLimit interface{} // 流量限制
|
||||
Features interface{} // 允许的功能
|
||||
TrafficPrice interface{} // 流量价格设定
|
||||
BandwidthPrice interface{} // 带宽价格
|
||||
MonthlyPrice interface{} // 月付
|
||||
SeasonallyPrice interface{} // 季付
|
||||
YearlyPrice interface{} // 年付
|
||||
|
||||
@@ -1 +1,38 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
)
|
||||
|
||||
// DecodeTrafficPrice 流量价格配置
|
||||
func (this *Plan) DecodeTrafficPrice() *serverconfigs.PlanTrafficPriceConfig {
|
||||
var config = &serverconfigs.PlanTrafficPriceConfig{}
|
||||
|
||||
if len(this.TrafficPrice) == 0 {
|
||||
return config
|
||||
}
|
||||
|
||||
err := json.Unmarshal([]byte(this.TrafficPrice), config)
|
||||
if err != nil {
|
||||
// 忽略错误
|
||||
}
|
||||
|
||||
return config
|
||||
}
|
||||
|
||||
// DecodeBandwidthPrice 带宽价格配置
|
||||
func (this *Plan) DecodeBandwidthPrice() *serverconfigs.PlanBandwidthPriceConfig {
|
||||
var config = &serverconfigs.PlanBandwidthPriceConfig{}
|
||||
|
||||
if len(this.BandwidthPrice) == 0 {
|
||||
return config
|
||||
}
|
||||
|
||||
err := json.Unmarshal([]byte(this.BandwidthPrice), config)
|
||||
if err != nil {
|
||||
// 忽略错误
|
||||
}
|
||||
|
||||
return config
|
||||
}
|
||||
|
||||
108
internal/db/models/server_bill_dao.go
Normal file
108
internal/db/models/server_bill_dao.go
Normal file
@@ -0,0 +1,108 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
_ "github.com/go-sql-driver/mysql"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
"github.com/iwind/TeaGo/dbs"
|
||||
"github.com/iwind/TeaGo/maps"
|
||||
"math"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ServerBillDAO dbs.DAO
|
||||
|
||||
func NewServerBillDAO() *ServerBillDAO {
|
||||
return dbs.NewDAO(&ServerBillDAO{
|
||||
DAOObject: dbs.DAOObject{
|
||||
DB: Tea.Env,
|
||||
Table: "edgeServerBills",
|
||||
Model: new(ServerBill),
|
||||
PkName: "id",
|
||||
},
|
||||
}).(*ServerBillDAO)
|
||||
}
|
||||
|
||||
var SharedServerBillDAO *ServerBillDAO
|
||||
|
||||
func init() {
|
||||
dbs.OnReady(func() {
|
||||
SharedServerBillDAO = NewServerBillDAO()
|
||||
})
|
||||
}
|
||||
|
||||
// CreateOrUpdateServerBill 创建账单
|
||||
func (this *ServerBillDAO) CreateOrUpdateServerBill(tx *dbs.Tx,
|
||||
userId int64,
|
||||
serverId int64,
|
||||
month string,
|
||||
userPlanId int64,
|
||||
planId int64,
|
||||
totalTrafficBytes int64,
|
||||
bandwidthPercentileBytes int64,
|
||||
bandwidthPercentile int,
|
||||
priceType string,
|
||||
fee float64) error {
|
||||
fee = math.Floor(fee*100) / 100
|
||||
return this.Query(tx).
|
||||
InsertOrUpdateQuickly(maps.Map{
|
||||
"userId": userId,
|
||||
"serverId": serverId,
|
||||
"month": month,
|
||||
"priceType": priceType,
|
||||
"amount": fee,
|
||||
"userPlanId": userPlanId,
|
||||
"planId": planId,
|
||||
"totalTrafficBytes": totalTrafficBytes,
|
||||
"bandwidthPercentileBytes": bandwidthPercentileBytes,
|
||||
"bandwidthPercentile": bandwidthPercentile,
|
||||
"createdAt": time.Now().Unix(),
|
||||
}, maps.Map{
|
||||
"userId": userId,
|
||||
"priceType": priceType,
|
||||
"amount": fee,
|
||||
"userPlanId": userPlanId,
|
||||
"planId": planId,
|
||||
"totalTrafficBytes": totalTrafficBytes,
|
||||
"bandwidthPercentileBytes": bandwidthPercentileBytes,
|
||||
"bandwidthPercentile": bandwidthPercentile,
|
||||
"createdAt": time.Now().Unix(),
|
||||
})
|
||||
}
|
||||
|
||||
// SumUserMonthlyAmount 计算总费用
|
||||
func (this *ServerBillDAO) SumUserMonthlyAmount(tx *dbs.Tx, userId int64, month string) (float64, error) {
|
||||
return this.Query(tx).
|
||||
Attr("userId", userId).
|
||||
Attr("month", month).
|
||||
Sum("amount", 0)
|
||||
}
|
||||
|
||||
// CountServerBills 计算总账单数量
|
||||
func (this *ServerBillDAO) CountServerBills(tx *dbs.Tx, userId int64, month string) (int64, error) {
|
||||
var query = this.Query(tx)
|
||||
if userId > 0 {
|
||||
query.Attr("userId", userId)
|
||||
}
|
||||
if len(month) > 0 {
|
||||
query.Attr("month", month)
|
||||
}
|
||||
return query.Count()
|
||||
}
|
||||
|
||||
// ListServerBills 列出单页账单
|
||||
func (this *ServerBillDAO) ListServerBills(tx *dbs.Tx, userId int64, month string, offset int64, size int64) (result []*ServerBill, err error) {
|
||||
var query = this.Query(tx)
|
||||
if userId > 0 {
|
||||
query.Attr("userId", userId)
|
||||
}
|
||||
if len(month) > 0 {
|
||||
query.Attr("month", month)
|
||||
}
|
||||
_, err = query.
|
||||
Desc("serverId").
|
||||
Offset(offset).
|
||||
Limit(size).
|
||||
Slice(&result).
|
||||
FindAll()
|
||||
return
|
||||
}
|
||||
20
internal/db/models/server_bill_dao_test.go
Normal file
20
internal/db/models/server_bill_dao_test.go
Normal file
@@ -0,0 +1,20 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
_ "github.com/go-sql-driver/mysql"
|
||||
_ "github.com/iwind/TeaGo/bootstrap"
|
||||
"github.com/iwind/TeaGo/dbs"
|
||||
timeutil "github.com/iwind/TeaGo/utils/time"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestServerBillDAO_CreateOrUpdateServerBill(t *testing.T) {
|
||||
var dao = NewServerBillDAO()
|
||||
var tx *dbs.Tx
|
||||
var month = timeutil.Format("Y02")
|
||||
err := dao.CreateOrUpdateServerBill(tx, 1, 2, month, 4, 5, 6, 7, 95, "", 100)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log("ok")
|
||||
}
|
||||
36
internal/db/models/server_bill_model.go
Normal file
36
internal/db/models/server_bill_model.go
Normal file
@@ -0,0 +1,36 @@
|
||||
package models
|
||||
|
||||
// ServerBill 服务账单
|
||||
type ServerBill struct {
|
||||
Id uint64 `field:"id"` // ID
|
||||
UserId uint32 `field:"userId"` // 用户ID
|
||||
ServerId uint32 `field:"serverId"` // 服务ID
|
||||
Amount float64 `field:"amount"` // 金额
|
||||
Month string `field:"month"` // 月份
|
||||
CreatedAt uint64 `field:"createdAt"` // 创建时间
|
||||
UserPlanId uint32 `field:"userPlanId"` // 用户套餐ID
|
||||
PlanId uint32 `field:"planId"` // 套餐ID
|
||||
TotalTrafficBytes uint64 `field:"totalTrafficBytes"` // 总流量
|
||||
BandwidthPercentileBytes uint64 `field:"bandwidthPercentileBytes"` // 带宽百分位字节
|
||||
BandwidthPercentile uint8 `field:"bandwidthPercentile"` // 带宽百分位
|
||||
PriceType string `field:"priceType"` // 计费类型
|
||||
}
|
||||
|
||||
type ServerBillOperator struct {
|
||||
Id interface{} // ID
|
||||
UserId interface{} // 用户ID
|
||||
ServerId interface{} // 服务ID
|
||||
Amount interface{} // 金额
|
||||
Month interface{} // 月份
|
||||
CreatedAt interface{} // 创建时间
|
||||
UserPlanId interface{} // 用户套餐ID
|
||||
PlanId interface{} // 套餐ID
|
||||
TotalTrafficBytes interface{} // 总流量
|
||||
BandwidthPercentileBytes interface{} // 带宽百分位字节
|
||||
BandwidthPercentile interface{} // 带宽百分位
|
||||
PriceType interface{} // 计费类型
|
||||
}
|
||||
|
||||
func NewServerBillOperator() *ServerBillOperator {
|
||||
return &ServerBillOperator{}
|
||||
}
|
||||
1
internal/db/models/server_bill_model_ext.go
Normal file
1
internal/db/models/server_bill_model_ext.go
Normal file
@@ -0,0 +1 @@
|
||||
package models
|
||||
@@ -12,7 +12,9 @@ import (
|
||||
"github.com/iwind/TeaGo/maps"
|
||||
"github.com/iwind/TeaGo/rands"
|
||||
timeutil "github.com/iwind/TeaGo/utils/time"
|
||||
"math"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -24,7 +26,7 @@ func init() {
|
||||
var ticker = time.NewTicker(time.Duration(rands.Int(24, 48)) * time.Hour)
|
||||
goman.New(func() {
|
||||
for range ticker.C {
|
||||
err := SharedServerDailyStatDAO.Clean(nil, 30) // 只保留N天
|
||||
err := SharedServerDailyStatDAO.Clean(nil, 60) // 只保留 N 天,时间需要长一些,因为需要用来生成账单
|
||||
if err != nil {
|
||||
logs.Println("ServerDailyStatDAO", "clean expired data failed: "+err.Error())
|
||||
}
|
||||
@@ -130,15 +132,15 @@ func (this *ServerDailyStatDAO) SaveStats(tx *dbs.Tx, stats []*pb.ServerDailySta
|
||||
return nil
|
||||
}
|
||||
|
||||
// SumUserMonthly 根据用户计算某月合计
|
||||
// SumServerMonthlyWithRegion 根据服务计算某月合计
|
||||
// month 格式为YYYYMM
|
||||
func (this *ServerDailyStatDAO) SumUserMonthly(tx *dbs.Tx, userId int64, regionId int64, month string) (int64, error) {
|
||||
func (this *ServerDailyStatDAO) SumServerMonthlyWithRegion(tx *dbs.Tx, serverId int64, regionId int64, month string) (int64, error) {
|
||||
query := this.Query(tx)
|
||||
if regionId > 0 {
|
||||
query.Attr("regionId", regionId)
|
||||
}
|
||||
return query.Between("day", month+"01", month+"32").
|
||||
Attr("userId", userId).
|
||||
Attr("serverId", serverId).
|
||||
SumInt64("bytes", 0)
|
||||
}
|
||||
|
||||
@@ -156,16 +158,6 @@ func (this *ServerDailyStatDAO) SumUserMonthlyWithoutPlan(tx *dbs.Tx, userId int
|
||||
SumInt64("bytes", 0)
|
||||
}
|
||||
|
||||
// SumUserMonthlyFee 计算用户某个月费用
|
||||
// month 格式为YYYYMM
|
||||
func (this *ServerDailyStatDAO) SumUserMonthlyFee(tx *dbs.Tx, userId int64, month string) (float64, error) {
|
||||
return this.Query(tx).
|
||||
Attr("userId", userId).
|
||||
Between("day", month+"01", month+"32").
|
||||
Gt("fee", 0).
|
||||
Sum("fee", 0)
|
||||
}
|
||||
|
||||
// SumUserMonthlyPeek 获取某月带宽峰值
|
||||
// month 格式为YYYYMM
|
||||
func (this *ServerDailyStatDAO) SumUserMonthlyPeek(tx *dbs.Tx, userId int64, regionId int64, month string) (int64, error) {
|
||||
@@ -195,6 +187,15 @@ func (this *ServerDailyStatDAO) SumUserDaily(tx *dbs.Tx, userId int64, regionId
|
||||
SumInt64("bytes", 0)
|
||||
}
|
||||
|
||||
// SumUserMonthly 获取某月流量总和
|
||||
// month 格式为YYYYMM
|
||||
func (this *ServerDailyStatDAO) SumUserMonthly(tx *dbs.Tx, userId int64, month string) (int64, error) {
|
||||
return this.Query(tx).
|
||||
Between("day", month+"01", month+"31").
|
||||
Attr("userId", userId).
|
||||
SumInt64("bytes", 0)
|
||||
}
|
||||
|
||||
// SumUserDailyPeek 获取某天带宽峰值
|
||||
// day 格式为YYYYMMDD
|
||||
func (this *ServerDailyStatDAO) SumUserDailyPeek(tx *dbs.Tx, userId int64, regionId int64, day string) (int64, error) {
|
||||
@@ -339,6 +340,59 @@ func (this *ServerDailyStatDAO) SumMonthlyStat(tx *dbs.Tx, serverId int64, month
|
||||
return
|
||||
}
|
||||
|
||||
// SumMonthlyBytes 获取某月内的流量
|
||||
// month 格式为YYYYMM
|
||||
func (this *ServerDailyStatDAO) SumMonthlyBytes(tx *dbs.Tx, serverId int64, month string) (result int64, err error) {
|
||||
if !regexp.MustCompile(`^\d{6}$`).MatchString(month) {
|
||||
return
|
||||
}
|
||||
|
||||
return this.Query(tx).
|
||||
Result("SUM(bytes) AS bytes").
|
||||
Attr("serverId", serverId).
|
||||
Between("day", month+"01", month+"31").
|
||||
FindInt64Col(0)
|
||||
}
|
||||
|
||||
// FindMonthlyPercentile 获取某月内百分位
|
||||
func (this *ServerDailyStatDAO) FindMonthlyPercentile(tx *dbs.Tx, serverId int64, month string, percentile int) (result int64, err error) {
|
||||
if percentile <= 0 {
|
||||
percentile = 95
|
||||
}
|
||||
if percentile > 100 {
|
||||
percentile = 100
|
||||
}
|
||||
|
||||
total, err := this.Query(tx).
|
||||
Attr("serverId", serverId).
|
||||
Between("day", month+"01", month+"31").
|
||||
Count()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if total == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
var offset int64
|
||||
|
||||
if total > 1 {
|
||||
offset = int64(math.Ceil(float64(total) * float64(100-percentile) / 100))
|
||||
}
|
||||
result, err = this.Query(tx).
|
||||
Result("bytes").
|
||||
Attr("serverId", serverId).
|
||||
Between("day", month+"01", month+"31").
|
||||
Desc("bytes").
|
||||
Offset(offset).
|
||||
Limit(1).
|
||||
FindInt64Col(0)
|
||||
|
||||
// 因为是5分钟统计,所以需要除以300
|
||||
result = result / 300
|
||||
return
|
||||
}
|
||||
|
||||
// FindDailyStats 按天统计
|
||||
func (this *ServerDailyStatDAO) FindDailyStats(tx *dbs.Tx, serverId int64, dayFrom string, dayTo string) (result []*ServerDailyStat, err error) {
|
||||
ones, err := this.Query(tx).
|
||||
@@ -428,6 +482,25 @@ func (this *ServerDailyStatDAO) FindTopUserStats(tx *dbs.Tx, hourFrom string, ho
|
||||
return
|
||||
}
|
||||
|
||||
// FindDistinctServerIds 查找所有有流量的服务ID列表
|
||||
// dayFrom YYYYMMDD
|
||||
// dayTo YYYYMMDD
|
||||
func (this *ServerDailyStatDAO) FindDistinctServerIds(tx *dbs.Tx, dayFrom string, dayTo string) (serverIds []int64, err error) {
|
||||
dayFrom = strings.ReplaceAll(dayFrom, "-", "")
|
||||
dayTo = strings.ReplaceAll(dayTo, "-", "")
|
||||
ones, _, err := this.Query(tx).
|
||||
Result("DISTINCT(serverId) AS serverId").
|
||||
Between("day", dayFrom, dayTo).
|
||||
FindOnes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, one := range ones {
|
||||
serverIds = append(serverIds, one.GetInt64("serverId"))
|
||||
}
|
||||
return serverIds, nil
|
||||
}
|
||||
|
||||
// UpdateStatFee 设置费用
|
||||
func (this *ServerDailyStatDAO) UpdateStatFee(tx *dbs.Tx, statId int64, fee float32) error {
|
||||
return this.Query(tx).
|
||||
|
||||
@@ -46,7 +46,7 @@ func TestServerDailyStatDAO_SaveStats2(t *testing.T) {
|
||||
func TestServerDailyStatDAO_SumUserMonthly(t *testing.T) {
|
||||
dbs.NotifyReady()
|
||||
var tx *dbs.Tx
|
||||
bytes, err := NewServerDailyStatDAO().SumUserMonthly(tx, 1, 1, timeutil.Format("Ym"))
|
||||
bytes, err := NewServerDailyStatDAO().SumUserMonthly(tx, 1, timeutil.Format("Ym"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -68,9 +68,28 @@ func TestServerDailyStatDAO_SumMinutelyRequests(t *testing.T) {
|
||||
dbs.NotifyReady()
|
||||
var tx *dbs.Tx
|
||||
|
||||
stat, err := NewServerDailyStatDAO().SumMinutelyStat(tx, 23, timeutil.Format("Ymd") + "1435")
|
||||
stat, err := NewServerDailyStatDAO().SumMinutelyStat(tx, 23, timeutil.Format("Ymd")+"1435")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
logs.PrintAsJSON(stat, t)
|
||||
}
|
||||
|
||||
func TestServerDailyStatDAO_FindDistinctPlanServerIdsBetweenDay(t *testing.T) {
|
||||
var tx *dbs.Tx
|
||||
serverIds, err := NewServerDailyStatDAO().FindDistinctServerIds(tx, timeutil.Format("Ym01"), timeutil.Format("Ymd"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log(serverIds)
|
||||
}
|
||||
|
||||
func TestServerDailyStatDAO_FindMonthlyPercentile(t *testing.T) {
|
||||
var tx *dbs.Tx
|
||||
var dao = NewServerDailyStatDAO()
|
||||
result, err := dao.FindMonthlyPercentile(tx, 23, timeutil.Format("Ym"), 95)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log("result:", result)
|
||||
}
|
||||
|
||||
@@ -221,6 +221,7 @@ func (this *ServerDAO) CreateServer(tx *dbs.Tx,
|
||||
op.DnsName = dnsName
|
||||
|
||||
op.UserPlanId = userPlanId
|
||||
op.LastUserPlanId = userPlanId
|
||||
|
||||
op.Version = 1
|
||||
op.IsOn = 1
|
||||
@@ -1500,11 +1501,11 @@ func (this *ServerDAO) UpdateUserServersClusterId(tx *dbs.Tx, userId int64, oldC
|
||||
}
|
||||
|
||||
if oldClusterId > 0 {
|
||||
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, oldClusterId, NodeTaskTypeConfigChanged)
|
||||
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, oldClusterId, 0, NodeTaskTypeConfigChanged)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, oldClusterId, NodeTaskTypeIPItemChanged)
|
||||
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, oldClusterId, 0, NodeTaskTypeIPItemChanged)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -1515,11 +1516,11 @@ func (this *ServerDAO) UpdateUserServersClusterId(tx *dbs.Tx, userId int64, oldC
|
||||
}
|
||||
|
||||
if newClusterId > 0 {
|
||||
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, newClusterId, NodeTaskTypeConfigChanged)
|
||||
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, newClusterId, 0, NodeTaskTypeConfigChanged)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, newClusterId, NodeTaskTypeIPItemChanged)
|
||||
err = SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, newClusterId, 0, NodeTaskTypeIPItemChanged)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -2242,6 +2243,7 @@ func (this *ServerDAO) UpdateServerUserPlanId(tx *dbs.Tx, serverId int64, userPl
|
||||
err = this.Query(tx).
|
||||
Pk(serverId).
|
||||
Set("userPlanId", userPlanId).
|
||||
Set("lastUserPlanId", userPlanId).
|
||||
Set("clusterId", plan.ClusterId).
|
||||
UpdateQuickly()
|
||||
if err != nil {
|
||||
@@ -2250,6 +2252,19 @@ func (this *ServerDAO) UpdateServerUserPlanId(tx *dbs.Tx, serverId int64, userPl
|
||||
return this.NotifyUpdate(tx, serverId)
|
||||
}
|
||||
|
||||
// FindServerLastUserPlanIdAndUserId 查找最后使用的套餐
|
||||
func (this *ServerDAO) FindServerLastUserPlanIdAndUserId(tx *dbs.Tx, serverId int64) (userPlanId int64, userId int64, err error) {
|
||||
one, err := this.Query(tx).
|
||||
Pk(serverId).
|
||||
Result("lastUserPlanId", "userId").
|
||||
Find()
|
||||
if err != nil || one == nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
return int64(one.(*Server).LastUserPlanId), int64(one.(*Server).UserId), nil
|
||||
}
|
||||
|
||||
// NotifyUpdate 同步集群
|
||||
func (this *ServerDAO) NotifyUpdate(tx *dbs.Tx, serverId int64) error {
|
||||
// 创建任务
|
||||
@@ -2260,7 +2275,7 @@ func (this *ServerDAO) NotifyUpdate(tx *dbs.Tx, serverId int64) error {
|
||||
if clusterId == 0 {
|
||||
return nil
|
||||
}
|
||||
return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, NodeTaskTypeConfigChanged)
|
||||
return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, serverId, NodeTaskTypeConfigChanged)
|
||||
}
|
||||
|
||||
// NotifyDNSUpdate 通知DNS更新
|
||||
|
||||
@@ -43,6 +43,7 @@ type Server struct {
|
||||
TrafficLimitStatus string `field:"trafficLimitStatus"` // 流量限制状态
|
||||
TotalTraffic float64 `field:"totalTraffic"` // 总流量
|
||||
UserPlanId uint32 `field:"userPlanId"` // 所属套餐ID
|
||||
LastUserPlanId uint32 `field:"lastUserPlanId"` // 上一次使用的套餐
|
||||
}
|
||||
|
||||
type ServerOperator struct {
|
||||
@@ -87,6 +88,7 @@ type ServerOperator struct {
|
||||
TrafficLimitStatus interface{} // 流量限制状态
|
||||
TotalTraffic interface{} // 总流量
|
||||
UserPlanId interface{} // 所属套餐ID
|
||||
LastUserPlanId interface{} // 上一次使用的套餐
|
||||
}
|
||||
|
||||
func NewServerOperator() *ServerOperator {
|
||||
|
||||
@@ -25,7 +25,7 @@ func init() {
|
||||
var ticker = time.NewTicker(time.Duration(rands.Int(24, 48)) * time.Hour)
|
||||
goman.New(func() {
|
||||
for range ticker.C {
|
||||
err := SharedServerDomainHourlyStatDAO.Clean(nil, 7) // 只保留7天
|
||||
err := SharedServerDomainHourlyStatDAO.Clean(nil, 7) // 只保留 N 天
|
||||
if err != nil {
|
||||
remotelogs.Error("ServerDomainHourlyStatDAO", "clean expired data failed: "+err.Error())
|
||||
}
|
||||
@@ -374,7 +374,9 @@ func (this *ServerDomainHourlyStatDAO) Clean(tx *dbs.Tx, days int) error {
|
||||
Table(table).
|
||||
Lt("hour", hour).
|
||||
Delete()
|
||||
return err
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -30,7 +30,7 @@ func init() {
|
||||
})
|
||||
}
|
||||
|
||||
// 开锁
|
||||
// Lock 开锁
|
||||
func (this *SysLockerDAO) Lock(tx *dbs.Tx, key string, timeout int64) (ok bool, err error) {
|
||||
maxErrors := 5
|
||||
for {
|
||||
@@ -103,7 +103,7 @@ func (this *SysLockerDAO) Lock(tx *dbs.Tx, key string, timeout int64) (ok bool,
|
||||
}
|
||||
}
|
||||
|
||||
// 解锁
|
||||
// Unlock 解锁
|
||||
func (this *SysLockerDAO) Unlock(tx *dbs.Tx, key string) error {
|
||||
_, err := this.Query(tx).
|
||||
Attr("key", key).
|
||||
@@ -112,7 +112,7 @@ func (this *SysLockerDAO) Unlock(tx *dbs.Tx, key string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// 增加版本号
|
||||
// Increase 增加版本号
|
||||
func (this *SysLockerDAO) Increase(tx *dbs.Tx, key string, defaultValue int64) (int64, error) {
|
||||
if tx == nil {
|
||||
var result int64
|
||||
|
||||
@@ -158,6 +158,24 @@ func (this *SysSettingDAO) ReadUserServerConfig(tx *dbs.Tx) (*userconfigs.UserSe
|
||||
return config, nil
|
||||
}
|
||||
|
||||
// ReadUserFinanceConfig 读取用户服务配置
|
||||
func (this *SysSettingDAO) ReadUserFinanceConfig(tx *dbs.Tx) (*userconfigs.UserFinanceConfig, error) {
|
||||
valueJSON, err := this.ReadSetting(tx, systemconfigs.SettingCodeUserFinanceConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(valueJSON) == 0 {
|
||||
return userconfigs.DefaultUserFinanceConfig(), nil
|
||||
}
|
||||
|
||||
var config = userconfigs.DefaultUserFinanceConfig()
|
||||
err = json.Unmarshal(valueJSON, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return config, nil
|
||||
}
|
||||
|
||||
// ReadAdminUIConfig 读取管理员界面配置
|
||||
func (this *SysSettingDAO) ReadAdminUIConfig(tx *dbs.Tx, cacheMap *utils.CacheMap) (*systemconfigs.AdminUIConfig, error) {
|
||||
var cacheKey = this.Table + ":ReadAdminUIConfig"
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/goman"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/utils/numberutils"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/utils"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
_ "github.com/go-sql-driver/mysql"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
@@ -23,7 +22,7 @@ func init() {
|
||||
|
||||
goman.New(func() {
|
||||
// 自动生成账单任务
|
||||
var ticker = time.NewTicker(1 * time.Minute)
|
||||
var ticker = time.NewTicker(1 * time.Hour)
|
||||
for range ticker.C {
|
||||
// 是否已经生成了,如果已经生成了就跳过
|
||||
var lastMonth = timeutil.Format("Ym", time.Now().AddDate(0, -1, 0))
|
||||
@@ -136,7 +135,7 @@ func (this *UserBillDAO) FindUnpaidBills(tx *dbs.Tx, size int64) (result []*User
|
||||
}
|
||||
|
||||
// CreateBill 创建账单
|
||||
func (this *UserBillDAO) CreateBill(tx *dbs.Tx, userId int64, billType BillType, description string, amount float32, month string) error {
|
||||
func (this *UserBillDAO) CreateBill(tx *dbs.Tx, userId int64, billType BillType, description string, amount float64, month string, canPay bool) error {
|
||||
code, err := this.GenerateBillCode(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -149,9 +148,11 @@ func (this *UserBillDAO) CreateBill(tx *dbs.Tx, userId int64, billType BillType,
|
||||
"amount": amount,
|
||||
"month": month,
|
||||
"code": code,
|
||||
"isPaid": false,
|
||||
"isPaid": amount == 0,
|
||||
"canPay": canPay,
|
||||
}, maps.Map{
|
||||
"amount": amount,
|
||||
"canPay": canPay,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -172,19 +173,22 @@ func (this *UserBillDAO) GenerateBills(tx *dbs.Tx, month string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(regions) == 0 {
|
||||
return nil
|
||||
|
||||
var priceItems []*NodePriceItem
|
||||
if len(regions) > 0 {
|
||||
priceItems, err = SharedNodePriceItemDAO.FindAllEnabledRegionPrices(tx, NodePriceTypeTraffic)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
priceItems, err := SharedNodePriceItemDAO.FindAllEnabledRegionPrices(tx, NodePriceTypeTraffic)
|
||||
// 默认计费方式
|
||||
userFinanceConfig, err := SharedSysSettingDAO.ReadUserFinanceConfig(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(priceItems) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 计算套餐费用
|
||||
// 计算服务套餐费用
|
||||
plans, err := SharedPlanDAO.FindAllEnabledPlans(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -194,54 +198,220 @@ func (this *UserBillDAO) GenerateBills(tx *dbs.Tx, month string) error {
|
||||
planMap[int64(plan.Id)] = plan
|
||||
}
|
||||
|
||||
stats, err := SharedServerDailyStatDAO.FindMonthlyStatsWithPlan(tx, month)
|
||||
var dayFrom = month + "01"
|
||||
var dayTo = month + "32"
|
||||
serverIds, err := SharedServerDailyStatDAO.FindDistinctServerIds(tx, dayFrom, dayTo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, stat := range stats {
|
||||
plan, ok := planMap[int64(stat.PlanId)]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if plan.PriceType != serverconfigs.PlanPriceTypeTraffic {
|
||||
continue
|
||||
}
|
||||
if len(plan.TrafficPrice) == 0 {
|
||||
continue
|
||||
}
|
||||
var priceConfig = &serverconfigs.PlanTrafficPrice{}
|
||||
err = json.Unmarshal([]byte(plan.TrafficPrice), priceConfig)
|
||||
var cacheMap = utils.NewCacheMap()
|
||||
var userIds = []int64{}
|
||||
for _, serverId := range serverIds {
|
||||
// 套餐类型
|
||||
userPlanId, userId, err := SharedServerDAO.FindServerLastUserPlanIdAndUserId(tx, serverId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if priceConfig.Base > 0 {
|
||||
var fee = priceConfig.Base * (float32(stat.Bytes) / 1024 / 1024 / 1024)
|
||||
err = SharedServerDailyStatDAO.UpdateStatFee(tx, int64(stat.Id), fee)
|
||||
if userId == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
userIds = append(userIds, userId)
|
||||
if userPlanId == 0 {
|
||||
// 总流量
|
||||
totalTrafficBytes, err := SharedServerDailyStatDAO.SumMonthlyBytes(tx, serverId, month)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 默认计费方式
|
||||
if userFinanceConfig != nil && userFinanceConfig.IsOn { // 默认计费方式
|
||||
switch userFinanceConfig.PriceType {
|
||||
case serverconfigs.PlanPriceTypeTraffic:
|
||||
var config = userFinanceConfig.TrafficPriceConfig
|
||||
var fee float64 = 0
|
||||
if config != nil && config.Base > 0 {
|
||||
fee = float64(totalTrafficBytes) / 1024 / 1024 / 1024 * float64(config.Base)
|
||||
}
|
||||
|
||||
// 百分位
|
||||
var percentile = 95
|
||||
percentileBytes, err := SharedServerDailyStatDAO.FindMonthlyPercentile(tx, serverId, month, percentile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = SharedServerBillDAO.CreateOrUpdateServerBill(tx, userId, serverId, month, userPlanId, 0, totalTrafficBytes, percentileBytes, percentile, userFinanceConfig.PriceType, fee)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case serverconfigs.PlanPriceTypeBandwidth:
|
||||
// 百分位
|
||||
var percentile = 95
|
||||
var config = userFinanceConfig.BandwidthPriceConfig
|
||||
if config != nil {
|
||||
percentile = config.Percentile
|
||||
if percentile <= 0 {
|
||||
percentile = 95
|
||||
} else if percentile > 100 {
|
||||
percentile = 100
|
||||
}
|
||||
}
|
||||
percentileBytes, err := SharedServerDailyStatDAO.FindMonthlyPercentile(tx, serverId, month, percentile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var mb = float32(percentileBytes) / 1024 / 1024
|
||||
var price float32
|
||||
if config != nil {
|
||||
price = config.LookupPrice(mb)
|
||||
}
|
||||
var fee = float64(price)
|
||||
err = SharedServerBillDAO.CreateOrUpdateServerBill(tx, userId, serverId, month, userPlanId, 0, totalTrafficBytes, percentileBytes, percentile, userFinanceConfig.PriceType, fee)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
} else { // 区域流量计费
|
||||
var fee float64
|
||||
|
||||
for _, region := range regions {
|
||||
var regionId = int64(region.Id)
|
||||
var pricesMap = region.DecodePriceMap()
|
||||
if len(pricesMap) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
trafficBytes, err := SharedServerDailyStatDAO.SumServerMonthlyWithRegion(tx, serverId, regionId, month)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if trafficBytes == 0 {
|
||||
continue
|
||||
}
|
||||
var itemId = SharedNodePriceItemDAO.SearchItemsWithBytes(priceItems, trafficBytes)
|
||||
if itemId == 0 {
|
||||
continue
|
||||
}
|
||||
price, ok := pricesMap[itemId]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if price <= 0 {
|
||||
continue
|
||||
}
|
||||
var regionFee = float64(trafficBytes) / 1000 / 1000 / 1000 * 8 * price
|
||||
fee += regionFee
|
||||
}
|
||||
|
||||
// 百分位
|
||||
var percentile = 95
|
||||
percentileBytes, err := SharedServerDailyStatDAO.FindMonthlyPercentile(tx, serverId, month, percentile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = SharedServerBillDAO.CreateOrUpdateServerBill(tx, userId, serverId, month, userPlanId, 0, totalTrafficBytes, percentileBytes, percentile, "", fee)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
userPlan, err := SharedUserPlanDAO.FindUserPlanWithoutState(tx, userPlanId, cacheMap)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if userPlan == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
plan, ok := planMap[int64(userPlan.PlanId)]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// 总流量
|
||||
totalTrafficBytes, err := SharedServerDailyStatDAO.SumMonthlyBytes(tx, serverId, month)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
switch plan.PriceType {
|
||||
case serverconfigs.PlanPriceTypePeriod:
|
||||
// 已经在购买套餐的时候付过费,这里不再重复计费
|
||||
var fee float64 = 0
|
||||
|
||||
// 百分位
|
||||
var percentile = 95
|
||||
percentileBytes, err := SharedServerDailyStatDAO.FindMonthlyPercentile(tx, serverId, month, percentile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = SharedServerBillDAO.CreateOrUpdateServerBill(tx, int64(userPlan.UserId), serverId, month, userPlanId, int64(userPlan.PlanId), totalTrafficBytes, percentileBytes, percentile, plan.PriceType, fee)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case serverconfigs.PlanPriceTypeTraffic:
|
||||
var config = plan.DecodeTrafficPrice()
|
||||
var fee float64 = 0
|
||||
if config != nil && config.Base > 0 {
|
||||
fee = float64(totalTrafficBytes) / 1024 / 1024 / 1024 * float64(config.Base)
|
||||
}
|
||||
|
||||
// 百分位
|
||||
var percentile = 95
|
||||
percentileBytes, err := SharedServerDailyStatDAO.FindMonthlyPercentile(tx, serverId, month, percentile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = SharedServerBillDAO.CreateOrUpdateServerBill(tx, int64(userPlan.UserId), serverId, month, userPlanId, int64(userPlan.PlanId), totalTrafficBytes, percentileBytes, percentile, plan.PriceType, fee)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case serverconfigs.PlanPriceTypeBandwidth:
|
||||
// 百分位
|
||||
var percentile = 95
|
||||
var config = plan.DecodeBandwidthPrice()
|
||||
if config != nil {
|
||||
percentile = config.Percentile
|
||||
if percentile <= 0 {
|
||||
percentile = 95
|
||||
} else if percentile > 100 {
|
||||
percentile = 100
|
||||
}
|
||||
}
|
||||
percentileBytes, err := SharedServerDailyStatDAO.FindMonthlyPercentile(tx, serverId, month, percentile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var mb = float32(percentileBytes) / 1024 / 1024
|
||||
var price float32
|
||||
if config != nil {
|
||||
price = config.LookupPrice(mb)
|
||||
}
|
||||
var fee = float64(price)
|
||||
err = SharedServerBillDAO.CreateOrUpdateServerBill(tx, int64(userPlan.UserId), serverId, month, userPlanId, int64(userPlan.PlanId), totalTrafficBytes, percentileBytes, percentile, plan.PriceType, fee)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 用户
|
||||
offset := int64(0)
|
||||
size := int64(100) // 每次只查询N次,防止由于执行时间过长而锁表
|
||||
for {
|
||||
userIds, err := SharedUserDAO.ListEnabledUserIds(tx, offset, size)
|
||||
// 计算用户费用
|
||||
for _, userId := range userIds {
|
||||
if userId == 0 {
|
||||
continue
|
||||
}
|
||||
amount, err := SharedServerBillDAO.SumUserMonthlyAmount(tx, userId, month)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
offset += size
|
||||
if len(userIds) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
for _, userId := range userIds {
|
||||
// CDN流量账单
|
||||
err := this.generateTrafficBill(tx, userId, month, regions, priceItems)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = SharedUserBillDAO.CreateBill(tx, userId, BillTypeTraffic, "流量带宽费用", amount, month, month < timeutil.Format("Ym"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -256,74 +426,11 @@ func (this *UserBillDAO) UpdateUserBillIsPaid(tx *dbs.Tx, billId int64, isPaid b
|
||||
UpdateQuickly()
|
||||
}
|
||||
|
||||
// 生成CDN流量账单
|
||||
// month 格式YYYYMM
|
||||
func (this *UserBillDAO) generateTrafficBill(tx *dbs.Tx, userId int64, month string, regions []*NodeRegion, priceItems []*NodePriceItem) error {
|
||||
// 检查是否已经有账单了
|
||||
if month < timeutil.Format("Ym") {
|
||||
b, err := this.ExistBill(tx, userId, BillTypeTraffic, month)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if b {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
var cost = float32(0)
|
||||
for _, region := range regions {
|
||||
if len(region.Prices) == 0 || region.Prices == "null" {
|
||||
continue
|
||||
}
|
||||
priceMap := map[string]float32{}
|
||||
err := json.Unmarshal([]byte(region.Prices), &priceMap)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
trafficBytes, err := SharedServerDailyStatDAO.SumUserMonthlyWithoutPlan(tx, userId, int64(region.Id), month)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if trafficBytes == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
itemId := SharedNodePriceItemDAO.SearchItemsWithBytes(priceItems, trafficBytes)
|
||||
if itemId == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
price, ok := priceMap[numberutils.FormatInt64(itemId)]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// 计算钱
|
||||
// 这里采用1000进制
|
||||
cost += (float32(trafficBytes*8) / 1_000_000_000) * price
|
||||
}
|
||||
|
||||
// 套餐费用
|
||||
planFee, err := SharedServerDailyStatDAO.SumUserMonthlyFee(tx, userId, month)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cost += float32(planFee)
|
||||
|
||||
if cost == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 创建账单
|
||||
return this.CreateBill(tx, userId, BillTypeTraffic, "按流量计费", cost, month)
|
||||
}
|
||||
|
||||
// BillTypeName 获取账单类型名称
|
||||
func (this *UserBillDAO) BillTypeName(billType BillType) string {
|
||||
switch billType {
|
||||
case BillTypeTraffic:
|
||||
return "流量"
|
||||
return "流量带宽"
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
@@ -7,7 +7,10 @@ type UserBill struct {
|
||||
Type string `field:"type"` // 消费类型
|
||||
Description string `field:"description"` // 描述
|
||||
Amount float64 `field:"amount"` // 消费数额
|
||||
DayFrom string `field:"dayFrom"` // YYYYMMDD
|
||||
DayTo string `field:"dayTo"` // YYYYMMDD
|
||||
Month string `field:"month"` // 帐期YYYYMM
|
||||
CanPay uint8 `field:"canPay"` // 是否可以支付
|
||||
IsPaid uint8 `field:"isPaid"` // 是否已支付
|
||||
PaidAt uint64 `field:"paidAt"` // 支付时间
|
||||
Code string `field:"code"` // 账单编号
|
||||
@@ -20,7 +23,10 @@ type UserBillOperator struct {
|
||||
Type interface{} // 消费类型
|
||||
Description interface{} // 描述
|
||||
Amount interface{} // 消费数额
|
||||
DayFrom interface{} // YYYYMMDD
|
||||
DayTo interface{} // YYYYMMDD
|
||||
Month interface{} // 帐期YYYYMM
|
||||
CanPay interface{} // 是否可以支付
|
||||
IsPaid interface{} // 是否已支付
|
||||
PaidAt interface{} // 支付时间
|
||||
Code interface{} // 账单编号
|
||||
|
||||
@@ -95,6 +95,18 @@ func (this *UserDAO) FindEnabledBasicUser(tx *dbs.Tx, id int64) (*User, error) {
|
||||
return result.(*User), err
|
||||
}
|
||||
|
||||
// FindBasicUserWithoutState 查找用户基本信息,并忽略状态
|
||||
func (this *UserDAO) FindBasicUserWithoutState(tx *dbs.Tx, id int64) (*User, error) {
|
||||
result, err := this.Query(tx).
|
||||
Pk(id).
|
||||
Result("id", "fullname", "username").
|
||||
Find()
|
||||
if result == nil {
|
||||
return nil, err
|
||||
}
|
||||
return result.(*User), err
|
||||
}
|
||||
|
||||
// FindUserFullname 获取管理员名称
|
||||
func (this *UserDAO) FindUserFullname(tx *dbs.Tx, userId int64) (string, error) {
|
||||
return this.Query(tx).
|
||||
|
||||
@@ -85,6 +85,31 @@ func (this *UserPlanDAO) FindEnabledUserPlan(tx *dbs.Tx, userPlanId int64, cache
|
||||
return result.(*UserPlan), err
|
||||
}
|
||||
|
||||
// FindUserPlanWithoutState 查找套餐,并不检查状态
|
||||
// 防止因为删除套餐而导致计费失败
|
||||
func (this *UserPlanDAO) FindUserPlanWithoutState(tx *dbs.Tx, userPlanId int64, cacheMap *utils.CacheMap) (*UserPlan, error) {
|
||||
var cacheKey = this.Table + ":FindUserPlanWithoutState:" + types.String(userPlanId)
|
||||
if cacheMap != nil {
|
||||
cache, ok := cacheMap.Get(cacheKey)
|
||||
if ok {
|
||||
return cache.(*UserPlan), nil
|
||||
}
|
||||
}
|
||||
|
||||
result, err := this.Query(tx).
|
||||
Pk(userPlanId).
|
||||
Find()
|
||||
if result == nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cacheMap != nil {
|
||||
cacheMap.Put(cacheKey, result)
|
||||
}
|
||||
|
||||
return result.(*UserPlan), err
|
||||
}
|
||||
|
||||
// CountAllEnabledUserPlans 计算套餐数量
|
||||
func (this *UserPlanDAO) CountAllEnabledUserPlans(tx *dbs.Tx, userId int64, isAvailable bool, isExpired bool, expiringDays int32) (int64, error) {
|
||||
var query = this.Query(tx).
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -11,6 +12,7 @@ import (
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/events"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/goman"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/remotelogs"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/rpc"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/setup"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/utils"
|
||||
"github.com/go-yaml/yaml"
|
||||
@@ -214,10 +216,10 @@ func (this *APINode) listenRPC(listener net.Listener, tlsConfig *tls.Config) err
|
||||
var rpcServer *grpc.Server
|
||||
if tlsConfig == nil {
|
||||
remotelogs.Println("API_NODE", "listening GRPC http://"+listener.Addr().String()+" ...")
|
||||
rpcServer = grpc.NewServer(grpc.MaxRecvMsgSize(128 * 1024 * 1024))
|
||||
rpcServer = grpc.NewServer(grpc.MaxRecvMsgSize(128*1024*1024), grpc.UnaryInterceptor(this.unaryInterceptor))
|
||||
} else {
|
||||
logs.Println("[API_NODE]listening GRPC https://" + listener.Addr().String() + " ...")
|
||||
rpcServer = grpc.NewServer(grpc.Creds(credentials.NewTLS(tlsConfig)), grpc.MaxRecvMsgSize(128*1024*1024))
|
||||
rpcServer = grpc.NewServer(grpc.Creds(credentials.NewTLS(tlsConfig)), grpc.MaxRecvMsgSize(128*1024*1024), grpc.UnaryInterceptor(this.unaryInterceptor))
|
||||
}
|
||||
this.registerServices(rpcServer)
|
||||
err := rpcServer.Serve(listener)
|
||||
@@ -572,6 +574,11 @@ func (this *APINode) listenSock() error {
|
||||
"result": result,
|
||||
},
|
||||
})
|
||||
case "debug":
|
||||
teaconst.Debug = !teaconst.Debug
|
||||
_ = cmd.Reply(&gosock.Command{
|
||||
Params: map[string]interface{}{"debug": teaconst.Debug},
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
@@ -588,3 +595,29 @@ func (this *APINode) listenSock() error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// 服务过滤器
|
||||
func (this *APINode) unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||
if teaconst.Debug {
|
||||
var before = time.Now()
|
||||
var traceCtx = rpc.NewContext(ctx)
|
||||
resp, err = handler(traceCtx, req)
|
||||
|
||||
var costMs = time.Since(before).Seconds() * 1000
|
||||
statErr := models.SharedAPIMethodStatDAO.CreateStat(nil, info.FullMethod, "", costMs)
|
||||
if statErr != nil {
|
||||
remotelogs.Error("API_NODE", "create method stat failed: "+statErr.Error())
|
||||
}
|
||||
|
||||
var tagMap = traceCtx.TagMap()
|
||||
for tag, tagCostMs := range tagMap {
|
||||
statErr = models.SharedAPIMethodStatDAO.CreateStat(nil, info.FullMethod, tag, tagCostMs)
|
||||
if statErr != nil {
|
||||
remotelogs.Error("API_NODE", "create method stat failed: "+statErr.Error())
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
return handler(ctx, req)
|
||||
}
|
||||
|
||||
@@ -63,6 +63,11 @@ func (this *APINode) registerServices(server *grpc.Server) {
|
||||
pb.RegisterAPINodeServiceServer(server, instance)
|
||||
this.rest(instance)
|
||||
}
|
||||
{
|
||||
instance := this.serviceInstance(&services.APIMethodStatService{}).(*services.APIMethodStatService)
|
||||
pb.RegisterAPIMethodStatServiceServer(server, instance)
|
||||
this.rest(instance)
|
||||
}
|
||||
{
|
||||
instance := this.serviceInstance(&services.OriginService{}).(*services.OriginService)
|
||||
pb.RegisterOriginServiceServer(server, instance)
|
||||
@@ -343,6 +348,11 @@ func (this *APINode) registerServices(server *grpc.Server) {
|
||||
pb.RegisterUserBillServiceServer(server, instance)
|
||||
this.rest(instance)
|
||||
}
|
||||
{
|
||||
instance := this.serviceInstance(&services.ServerBillService{}).(*services.ServerBillService)
|
||||
pb.RegisterServerBillServiceServer(server, instance)
|
||||
this.rest(instance)
|
||||
}
|
||||
{
|
||||
instance := this.serviceInstance(&services.UserNodeService{}).(*services.UserNodeService)
|
||||
pb.RegisterUserNodeServiceServer(server, instance)
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
// +build community
|
||||
//go:build !plus
|
||||
// +build !plus
|
||||
|
||||
package nodes
|
||||
|
||||
|
||||
46
internal/rpc/context.go
Normal file
46
internal/rpc/context.go
Normal file
@@ -0,0 +1,46 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Context struct {
|
||||
context.Context
|
||||
|
||||
tagMap map[string]time.Time
|
||||
costMap map[string]float64 // tag => costMs
|
||||
locker sync.Mutex
|
||||
}
|
||||
|
||||
func NewContext(ctx context.Context) *Context {
|
||||
return &Context{
|
||||
Context: ctx,
|
||||
tagMap: map[string]time.Time{},
|
||||
costMap: map[string]float64{},
|
||||
}
|
||||
}
|
||||
|
||||
func (this *Context) Begin(tag string) {
|
||||
this.locker.Lock()
|
||||
this.tagMap[tag] = time.Now()
|
||||
this.locker.Unlock()
|
||||
}
|
||||
|
||||
func (this *Context) End(tag string) {
|
||||
this.locker.Lock()
|
||||
begin, ok := this.tagMap[tag]
|
||||
if ok {
|
||||
this.costMap[tag] = time.Since(begin).Seconds() * 1000
|
||||
}
|
||||
this.locker.Unlock()
|
||||
}
|
||||
|
||||
func (this *Context) TagMap() map[string]float64 {
|
||||
this.locker.Lock()
|
||||
defer this.locker.Unlock()
|
||||
return this.costMap
|
||||
}
|
||||
@@ -477,7 +477,9 @@ func (this *AdminService) ComposeAdminDashboard(ctx context.Context, req *pb.Com
|
||||
var tx = this.NullTx()
|
||||
|
||||
// 默认集群
|
||||
this.BeginTag(ctx, "SharedNodeClusterDAO.ListEnabledClusters")
|
||||
nodeClusters, err := models.SharedNodeClusterDAO.ListEnabledClusters(tx, "", 0, 1)
|
||||
this.EndTag(ctx, "SharedNodeClusterDAO.ListEnabledClusters")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -486,84 +488,108 @@ func (this *AdminService) ComposeAdminDashboard(ctx context.Context, req *pb.Com
|
||||
}
|
||||
|
||||
// 集群数
|
||||
this.BeginTag(ctx, "SharedNodeClusterDAO.CountAllEnabledClusters")
|
||||
countClusters, err := models.SharedNodeClusterDAO.CountAllEnabledClusters(tx, "")
|
||||
this.EndTag(ctx, "SharedNodeClusterDAO.CountAllEnabledClusters")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result.CountNodeClusters = countClusters
|
||||
|
||||
// 节点数
|
||||
this.BeginTag(ctx, "SharedNodeDAO.CountAllEnabledNodes")
|
||||
countNodes, err := models.SharedNodeDAO.CountAllEnabledNodes(tx)
|
||||
this.EndTag(ctx, "SharedNodeDAO.CountAllEnabledNodes")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result.CountNodes = countNodes
|
||||
|
||||
// 离线节点
|
||||
this.BeginTag(ctx, "SharedNodeDAO.CountAllEnabledOfflineNodes")
|
||||
countOfflineNodes, err := models.SharedNodeDAO.CountAllEnabledOfflineNodes(tx)
|
||||
this.EndTag(ctx, "SharedNodeDAO.CountAllEnabledOfflineNodes")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result.CountOfflineNodes = countOfflineNodes
|
||||
|
||||
// 服务数
|
||||
this.BeginTag(ctx, "SharedServerDAO.CountAllEnabledServers")
|
||||
countServers, err := models.SharedServerDAO.CountAllEnabledServers(tx)
|
||||
this.EndTag(ctx, "SharedServerDAO.CountAllEnabledServers")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result.CountServers = countServers
|
||||
|
||||
this.BeginTag(ctx, "SharedServerDAO.CountAllEnabledServersMatch")
|
||||
countAuditingServers, err := models.SharedServerDAO.CountAllEnabledServersMatch(tx, 0, "", 0, 0, configutils.BoolStateYes, nil)
|
||||
this.EndTag(ctx, "SharedServerDAO.CountAllEnabledServersMatch")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result.CountAuditingServers = countAuditingServers
|
||||
|
||||
// 用户数
|
||||
this.BeginTag(ctx, "SharedUserDAO.CountAllEnabledUsers")
|
||||
countUsers, err := models.SharedUserDAO.CountAllEnabledUsers(tx, 0, "", false)
|
||||
this.EndTag(ctx, "SharedUserDAO.CountAllEnabledUsers")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result.CountUsers = countUsers
|
||||
|
||||
// API节点数
|
||||
this.BeginTag(ctx, "SharedAPINodeDAO.CountAllEnabledAndOnAPINodes")
|
||||
countAPINodes, err := models.SharedAPINodeDAO.CountAllEnabledAndOnAPINodes(tx)
|
||||
this.EndTag(ctx, "SharedAPINodeDAO.CountAllEnabledAndOnAPINodes")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result.CountAPINodes = countAPINodes
|
||||
|
||||
// 离线API节点
|
||||
this.BeginTag(ctx, "SharedAPINodeDAO.CountAllEnabledAndOnOfflineAPINodes")
|
||||
countOfflineAPINodes, err := models.SharedAPINodeDAO.CountAllEnabledAndOnOfflineAPINodes(tx)
|
||||
this.EndTag(ctx, "SharedAPINodeDAO.CountAllEnabledAndOnOfflineAPINodes")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result.CountOfflineAPINodes = countOfflineAPINodes
|
||||
|
||||
// 数据库节点数
|
||||
this.BeginTag(ctx, "SharedDBNodeDAO.CountAllEnabledNodes")
|
||||
countDBNodes, err := models.SharedDBNodeDAO.CountAllEnabledNodes(tx)
|
||||
this.EndTag(ctx, "SharedDBNodeDAO.CountAllEnabledNodes")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result.CountDBNodes = countDBNodes
|
||||
|
||||
// 用户节点数
|
||||
this.BeginTag(ctx, "SharedUserNodeDAO.CountAllEnabledAndOnUserNodes")
|
||||
countUserNodes, err := models.SharedUserNodeDAO.CountAllEnabledAndOnUserNodes(tx)
|
||||
this.EndTag(ctx, "SharedUserNodeDAO.CountAllEnabledAndOnUserNodes")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result.CountUserNodes = countUserNodes
|
||||
|
||||
// 离线用户节点数
|
||||
this.BeginTag(ctx, "SharedUserNodeDAO.CountAllEnabledAndOnOfflineNodes")
|
||||
countOfflineUserNodes, err := models.SharedUserNodeDAO.CountAllEnabledAndOnOfflineNodes(tx)
|
||||
this.EndTag(ctx, "SharedUserNodeDAO.CountAllEnabledAndOnOfflineNodes")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result.CountOfflineUserNodes = countOfflineUserNodes
|
||||
|
||||
// 按日流量统计
|
||||
this.BeginTag(ctx, "SharedTrafficDailyStatDAO.FindDailyStats")
|
||||
dayFrom := timeutil.Format("Ymd", time.Now().AddDate(0, 0, -14))
|
||||
dailyTrafficStats, err := stats.SharedTrafficDailyStatDAO.FindDailyStats(tx, dayFrom, timeutil.Format("Ymd"))
|
||||
this.EndTag(ctx, "SharedTrafficDailyStatDAO.FindDailyStats")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -582,7 +608,9 @@ func (this *AdminService) ComposeAdminDashboard(ctx context.Context, req *pb.Com
|
||||
// 小时流量统计
|
||||
hourFrom := timeutil.Format("YmdH", time.Now().Add(-23*time.Hour))
|
||||
hourTo := timeutil.Format("YmdH")
|
||||
this.BeginTag(ctx, "SharedTrafficHourlyStatDAO.FindHourlyStats")
|
||||
hourlyTrafficStats, err := stats.SharedTrafficHourlyStatDAO.FindHourlyStats(tx, hourFrom, hourTo)
|
||||
this.EndTag(ctx, "SharedTrafficHourlyStatDAO.FindHourlyStats")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -609,7 +637,9 @@ func (this *AdminService) ComposeAdminDashboard(ctx context.Context, req *pb.Com
|
||||
upgradeInfo := &pb.ComposeAdminDashboardResponse_UpgradeInfo{
|
||||
NewVersion: teaconst.NodeVersion,
|
||||
}
|
||||
this.BeginTag(ctx, "SharedNodeDAO.CountAllLowerVersionNodes")
|
||||
countNodes, err := models.SharedNodeDAO.CountAllLowerVersionNodes(tx, upgradeInfo.NewVersion)
|
||||
this.EndTag(ctx, "SharedNodeDAO.CountAllLowerVersionNodes")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -622,7 +652,9 @@ func (this *AdminService) ComposeAdminDashboard(ctx context.Context, req *pb.Com
|
||||
upgradeInfo := &pb.ComposeAdminDashboardResponse_UpgradeInfo{
|
||||
NewVersion: teaconst.MonitorNodeVersion,
|
||||
}
|
||||
this.BeginTag(ctx, "SharedMonitorNodeDAO.CountAllLowerVersionNodes")
|
||||
countNodes, err := models.SharedMonitorNodeDAO.CountAllLowerVersionNodes(tx, upgradeInfo.NewVersion)
|
||||
this.EndTag(ctx, "SharedMonitorNodeDAO.CountAllLowerVersionNodes")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -635,7 +667,9 @@ func (this *AdminService) ComposeAdminDashboard(ctx context.Context, req *pb.Com
|
||||
upgradeInfo := &pb.ComposeAdminDashboardResponse_UpgradeInfo{
|
||||
NewVersion: teaconst.AuthorityNodeVersion,
|
||||
}
|
||||
this.BeginTag(ctx, "SharedAuthorityNodeDAO.CountAllLowerVersionNodes")
|
||||
countNodes, err := authority.SharedAuthorityNodeDAO.CountAllLowerVersionNodes(tx, upgradeInfo.NewVersion)
|
||||
this.EndTag(ctx, "SharedAuthorityNodeDAO.CountAllLowerVersionNodes")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -648,7 +682,9 @@ func (this *AdminService) ComposeAdminDashboard(ctx context.Context, req *pb.Com
|
||||
upgradeInfo := &pb.ComposeAdminDashboardResponse_UpgradeInfo{
|
||||
NewVersion: teaconst.UserNodeVersion,
|
||||
}
|
||||
this.BeginTag(ctx, "SharedUserNodeDAO.CountAllLowerVersionNodes")
|
||||
countNodes, err := models.SharedUserNodeDAO.CountAllLowerVersionNodes(tx, upgradeInfo.NewVersion)
|
||||
this.EndTag(ctx, "SharedUserNodeDAO.CountAllLowerVersionNodes")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -665,7 +701,9 @@ func (this *AdminService) ComposeAdminDashboard(ctx context.Context, req *pb.Com
|
||||
upgradeInfo := &pb.ComposeAdminDashboardResponse_UpgradeInfo{
|
||||
NewVersion: apiVersion,
|
||||
}
|
||||
this.BeginTag(ctx, "SharedAPINodeDAO.CountAllLowerVersionNodes")
|
||||
countNodes, err := models.SharedAPINodeDAO.CountAllLowerVersionNodes(tx, upgradeInfo.NewVersion)
|
||||
this.EndTag(ctx, "SharedAPINodeDAO.CountAllLowerVersionNodes")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -678,7 +716,9 @@ func (this *AdminService) ComposeAdminDashboard(ctx context.Context, req *pb.Com
|
||||
upgradeInfo := &pb.ComposeAdminDashboardResponse_UpgradeInfo{
|
||||
NewVersion: teaconst.DNSNodeVersion,
|
||||
}
|
||||
this.BeginTag(ctx, "SharedNSNodeDAO.CountAllLowerVersionNodes")
|
||||
countNodes, err := models.SharedNSNodeDAO.CountAllLowerVersionNodes(tx, upgradeInfo.NewVersion)
|
||||
this.EndTag(ctx, "SharedNSNodeDAO.CountAllLowerVersionNodes")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -691,7 +731,9 @@ func (this *AdminService) ComposeAdminDashboard(ctx context.Context, req *pb.Com
|
||||
upgradeInfo := &pb.ComposeAdminDashboardResponse_UpgradeInfo{
|
||||
NewVersion: teaconst.ReportNodeVersion,
|
||||
}
|
||||
this.BeginTag(ctx, "SharedReportNodeDAO.CountAllLowerVersionNodes")
|
||||
countNodes, err := models.SharedReportNodeDAO.CountAllLowerVersionNodes(tx, upgradeInfo.NewVersion)
|
||||
this.EndTag(ctx, "SharedReportNodeDAO.CountAllLowerVersionNodes")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -700,7 +742,9 @@ func (this *AdminService) ComposeAdminDashboard(ctx context.Context, req *pb.Com
|
||||
}
|
||||
|
||||
// 域名排行
|
||||
this.BeginTag(ctx, "SharedServerDomainHourlyStatDAO.FindTopDomainStats")
|
||||
topDomainStats, err := stats.SharedServerDomainHourlyStatDAO.FindTopDomainStats(tx, hourFrom, hourTo, 10)
|
||||
this.EndTag(ctx, "SharedServerDomainHourlyStatDAO.FindTopDomainStats")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -715,7 +759,9 @@ func (this *AdminService) ComposeAdminDashboard(ctx context.Context, req *pb.Com
|
||||
|
||||
// 节点排行
|
||||
if isPlus {
|
||||
this.BeginTag(ctx, "SharedNodeTrafficHourlyStatDAO.FindTopNodeStats")
|
||||
topNodeStats, err := stats.SharedNodeTrafficHourlyStatDAO.FindTopNodeStats(tx, "node", hourFrom, hourTo, 10)
|
||||
this.EndTag(ctx, "SharedNodeTrafficHourlyStatDAO.FindTopNodeStats")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -738,7 +784,9 @@ func (this *AdminService) ComposeAdminDashboard(ctx context.Context, req *pb.Com
|
||||
|
||||
// 地区流量排行
|
||||
if isPlus {
|
||||
this.BeginTag(ctx, "SharedServerRegionCountryDailyStatDAO.SumDailyTotalBytes")
|
||||
totalCountryBytes, err := stats.SharedServerRegionCountryDailyStatDAO.SumDailyTotalBytes(tx, timeutil.Format("Ymd"))
|
||||
this.EndTag(ctx, "SharedServerRegionCountryDailyStatDAO.SumDailyTotalBytes")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -767,7 +815,9 @@ func (this *AdminService) ComposeAdminDashboard(ctx context.Context, req *pb.Com
|
||||
}
|
||||
|
||||
// 指标数据
|
||||
this.BeginTag(ctx, "findMetricDataCharts")
|
||||
pbCharts, err := this.findMetricDataCharts(tx)
|
||||
this.EndTag(ctx, "findMetricDataCharts")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
83
internal/rpc/services/service_api_method_stat.go
Normal file
83
internal/rpc/services/service_api_method_stat.go
Normal file
@@ -0,0 +1,83 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/utils"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
timeutil "github.com/iwind/TeaGo/utils/time"
|
||||
)
|
||||
|
||||
// APIMethodStatService API方法统计服务
|
||||
type APIMethodStatService struct {
|
||||
BaseService
|
||||
}
|
||||
|
||||
// FindAPIMethodStatsWithDay 查找某天的统计
|
||||
func (this *APIMethodStatService) FindAPIMethodStatsWithDay(ctx context.Context, req *pb.FindAPIMethodStatsWithDayRequest) (*pb.FindAPIMethodStatsWithDayResponse, error) {
|
||||
_, err := this.ValidateAdmin(ctx, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var day = req.Day
|
||||
if len(day) == 0 {
|
||||
day = timeutil.Format("Ymd")
|
||||
}
|
||||
var tx = this.NullTx()
|
||||
stats, err := models.SharedAPIMethodStatDAO.FindAllStatsWithDay(tx, day)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var pbStats = []*pb.APIMethodStat{}
|
||||
var cacheMap = utils.NewCacheMap()
|
||||
for _, stat := range stats {
|
||||
apiNode, err := models.SharedAPINodeDAO.FindEnabledAPINode(tx, int64(stat.ApiNodeId), cacheMap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if apiNode == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
pbStats = append(pbStats, &pb.APIMethodStat{
|
||||
Id: int64(stat.Id),
|
||||
ApiNodeId: int64(stat.ApiNodeId),
|
||||
Method: stat.Method,
|
||||
Tag: stat.Tag,
|
||||
CostMs: float32(stat.CostMs),
|
||||
PeekMs: float32(stat.PeekMs),
|
||||
CountCalls: int64(stat.CountCalls),
|
||||
ApiNode: &pb.APINode{
|
||||
Id: int64(apiNode.Id),
|
||||
Name: apiNode.Name,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
return &pb.FindAPIMethodStatsWithDayResponse{
|
||||
ApiMethodStats: pbStats,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// CountAPIMethodStatsWithDay 检查是否有统计数据
|
||||
func (this *APIMethodStatService) CountAPIMethodStatsWithDay(ctx context.Context, req *pb.CountAPIMethodStatsWithDayRequest) (*pb.RPCCountResponse, error) {
|
||||
_, err := this.ValidateAdmin(ctx, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var day = req.Day
|
||||
if len(day) == 0 {
|
||||
day = timeutil.Format("Ymd")
|
||||
}
|
||||
|
||||
var tx = this.NullTx()
|
||||
count, err := models.SharedAPIMethodStatDAO.CountAllStatsWithDay(tx, day)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return this.SuccessCount(count)
|
||||
}
|
||||
@@ -189,7 +189,7 @@ func (this *APINodeService) FindEnabledAPINode(ctx context.Context, req *pb.Find
|
||||
|
||||
tx := this.NullTx()
|
||||
|
||||
node, err := models.SharedAPINodeDAO.FindEnabledAPINode(tx, req.ApiNodeId)
|
||||
node, err := models.SharedAPINodeDAO.FindEnabledAPINode(tx, req.ApiNodeId, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -241,7 +241,7 @@ func (this *APINodeService) FindCurrentAPINode(ctx context.Context, req *pb.Find
|
||||
|
||||
var nodeId = teaconst.NodeId
|
||||
var tx *dbs.Tx
|
||||
node, err := models.SharedAPINodeDAO.FindEnabledAPINode(tx, nodeId)
|
||||
node, err := models.SharedAPINodeDAO.FindEnabledAPINode(tx, nodeId, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -295,3 +295,14 @@ func (this *APINodeService) CountAllEnabledAPINodesWithSSLCertId(ctx context.Con
|
||||
}
|
||||
return this.SuccessCount(count)
|
||||
}
|
||||
|
||||
// DebugAPINode 修改调试模式状态
|
||||
func (this *APINodeService) DebugAPINode(ctx context.Context, req *pb.DebugAPINodeRequest) (*pb.RPCSuccess, error) {
|
||||
_, err := this.ValidateAdmin(ctx, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
teaconst.Debug = req.Debug
|
||||
return this.Success()
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/db/models/authority"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/encrypt"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/errors"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/rpc"
|
||||
rpcutils "github.com/TeaOSLab/EdgeAPI/internal/rpc/utils"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/utils"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
@@ -236,3 +237,25 @@ func (this *BaseService) RunTx(callback func(tx *dbs.Tx) error) error {
|
||||
}
|
||||
return db.RunTx(callback)
|
||||
}
|
||||
|
||||
// BeginTag 开始标签统计
|
||||
func (this *BaseService) BeginTag(ctx context.Context, name string) {
|
||||
if !teaconst.Debug {
|
||||
return
|
||||
}
|
||||
traceCtx, ok := ctx.(*rpc.Context)
|
||||
if ok {
|
||||
traceCtx.Begin(name)
|
||||
}
|
||||
}
|
||||
|
||||
// EndTag 结束标签统计
|
||||
func (this *BaseService) EndTag(ctx context.Context, name string) {
|
||||
if !teaconst.Debug {
|
||||
return
|
||||
}
|
||||
traceCtx, ok := ctx.(*rpc.Context)
|
||||
if ok {
|
||||
traceCtx.End(name)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -197,7 +197,7 @@ func (this *NodeClusterService) FindAPINodesWithNodeCluster(ctx context.Context,
|
||||
if len(apiNodeIds) > 0 {
|
||||
apiNodes := []*pb.APINode{}
|
||||
for _, apiNodeId := range apiNodeIds {
|
||||
apiNode, err := models.SharedAPINodeDAO.FindEnabledAPINode(tx, apiNodeId)
|
||||
apiNode, err := models.SharedAPINodeDAO.FindEnabledAPINode(tx, apiNodeId, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -39,6 +39,7 @@ func (this *NodeTaskService) FindNodeTasks(ctx context.Context, req *pb.FindNode
|
||||
Type: task.Type,
|
||||
Version: int64(task.Version),
|
||||
IsPrimary: primaryNodeId == nodeId,
|
||||
ServerId: int64(task.ServerId),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -137,6 +138,7 @@ func (this *NodeTaskService) FindNodeClusterTasks(ctx context.Context, req *pb.F
|
||||
IsOk: task.IsOk == 1,
|
||||
Error: task.Error,
|
||||
UpdatedAt: int64(task.UpdatedAt),
|
||||
ServerId: int64(task.ServerId),
|
||||
Node: &pb.Node{
|
||||
Id: int64(task.NodeId),
|
||||
Name: nodeName,
|
||||
@@ -261,6 +263,7 @@ func (this *NodeTaskService) FindNotifyingNodeTasks(ctx context.Context, req *pb
|
||||
Error: task.Error,
|
||||
UpdatedAt: int64(task.UpdatedAt),
|
||||
Node: &pb.Node{Id: int64(task.NodeId)},
|
||||
ServerId: int64(task.ServerId),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
//go:build community
|
||||
// +build community
|
||||
//go:build !plus
|
||||
// +build !plus
|
||||
|
||||
package services
|
||||
|
||||
|
||||
@@ -1924,3 +1924,29 @@ func (this *ServerService) FindServerUserPlan(ctx context.Context, req *pb.FindS
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ComposeServerConfig 获取服务配置
|
||||
func (this *ServerService) ComposeServerConfig(ctx context.Context, req *pb.ComposeServerConfigRequest) (*pb.ComposeServerConfigResponse, error) {
|
||||
_, err := this.ValidateNode(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var tx = this.NullTx()
|
||||
serverConfig, err := models.SharedServerDAO.ComposeServerConfigWithServerId(tx, req.ServerId, true)
|
||||
if err != nil {
|
||||
if err == models.ErrNotFound {
|
||||
return &pb.ComposeServerConfigResponse{ServerConfigJSON: nil}, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
if serverConfig == nil {
|
||||
return &pb.ComposeServerConfigResponse{ServerConfigJSON: nil}, nil
|
||||
}
|
||||
|
||||
configJSON, err := json.Marshal(serverConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &pb.ComposeServerConfigResponse{ServerConfigJSON: configJSON}, nil
|
||||
}
|
||||
|
||||
132
internal/rpc/services/service_server_bill.go
Normal file
132
internal/rpc/services/service_server_bill.go
Normal file
@@ -0,0 +1,132 @@
|
||||
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/utils"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
)
|
||||
|
||||
// ServerBillService 服务账单相关服务
|
||||
type ServerBillService struct {
|
||||
BaseService
|
||||
}
|
||||
|
||||
// CountAllServerBills 查询服务账单数量
|
||||
func (this *ServerBillService) CountAllServerBills(ctx context.Context, req *pb.CountAllServerBillsRequest) (*pb.RPCCountResponse, error) {
|
||||
_, userId, err := this.ValidateAdminAndUser(ctx, 0, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if userId > 0 {
|
||||
req.UserId = userId
|
||||
}
|
||||
|
||||
var tx = this.NullTx()
|
||||
count, err := models.SharedServerBillDAO.CountServerBills(tx, req.UserId, req.Month)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return this.SuccessCount(count)
|
||||
}
|
||||
|
||||
// ListServerBills 查询服务账单列表
|
||||
func (this *ServerBillService) ListServerBills(ctx context.Context, req *pb.ListServerBillsRequest) (*pb.ListServerBillsResponse, error) {
|
||||
_, userId, err := this.ValidateAdminAndUser(ctx, 0, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if userId > 0 {
|
||||
req.UserId = userId
|
||||
}
|
||||
|
||||
var tx = this.NullTx()
|
||||
serverBills, err := models.SharedServerBillDAO.ListServerBills(tx, req.UserId, req.Month, req.Offset, req.Size)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var pbServerBills = []*pb.ServerBill{}
|
||||
var cacheMap = utils.NewCacheMap()
|
||||
for _, bill := range serverBills {
|
||||
// user
|
||||
user, err := models.SharedUserDAO.FindBasicUserWithoutState(tx, int64(bill.UserId))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var pbUser = &pb.User{Id: int64(bill.UserId)}
|
||||
if user != nil {
|
||||
pbUser = &pb.User{
|
||||
Id: int64(bill.UserId),
|
||||
Username: user.Username,
|
||||
Fullname: user.Fullname,
|
||||
}
|
||||
}
|
||||
|
||||
// plan
|
||||
var pbPlan *pb.Plan
|
||||
if bill.PlanId > 0 {
|
||||
plan, err := models.SharedPlanDAO.FindEnabledPlan(tx, int64(bill.PlanId))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if plan != nil {
|
||||
pbPlan = &pb.Plan{
|
||||
Id: int64(plan.Id),
|
||||
Name: plan.Name,
|
||||
PriceType: plan.PriceType,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// user plan
|
||||
var pbUserPlan *pb.UserPlan
|
||||
if bill.UserPlanId > 0 {
|
||||
userPlan, err := models.SharedUserPlanDAO.FindEnabledUserPlan(tx, int64(bill.UserPlanId), cacheMap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if userPlan != nil {
|
||||
pbUserPlan = &pb.UserPlan{
|
||||
Id: int64(userPlan.Id),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// server
|
||||
var pbServer *pb.Server
|
||||
if bill.ServerId > 0 {
|
||||
server, err := models.SharedServerDAO.FindEnabledServerBasic(tx, int64(bill.ServerId))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if server != nil {
|
||||
pbServer = &pb.Server{Id: int64(bill.ServerId), Name: server.Name}
|
||||
}
|
||||
}
|
||||
|
||||
pbServerBills = append(pbServerBills, &pb.ServerBill{
|
||||
Id: int64(bill.Id),
|
||||
UserId: int64(bill.UserId),
|
||||
ServerId: int64(bill.ServerId),
|
||||
Amount: float32(bill.Amount),
|
||||
PriceType: bill.PriceType,
|
||||
CreatedAt: int64(bill.CreatedAt),
|
||||
UserPlanId: int64(bill.UserPlanId),
|
||||
PlanId: int64(bill.PlanId),
|
||||
TotalTrafficBytes: int64(bill.TotalTrafficBytes),
|
||||
BandwidthPercentileBytes: int64(bill.BandwidthPercentileBytes),
|
||||
BandwidthPercentile: int32(bill.BandwidthPercentile),
|
||||
User: pbUser,
|
||||
Plan: pbPlan,
|
||||
UserPlan: pbUserPlan,
|
||||
Server: pbServer,
|
||||
})
|
||||
}
|
||||
|
||||
return &pb.ListServerBillsResponse{ServerBills: pbServerBills}, nil
|
||||
}
|
||||
@@ -406,7 +406,7 @@ func (this *UserService) ComposeUserDashboard(ctx context.Context, req *pb.Compo
|
||||
|
||||
// 本月总流量
|
||||
month := timeutil.Format("Ym")
|
||||
monthlyTrafficBytes, err := models.SharedServerDailyStatDAO.SumUserMonthly(tx, req.UserId, 0, month)
|
||||
monthlyTrafficBytes, err := models.SharedServerDailyStatDAO.SumUserMonthly(tx, req.UserId, month)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -74,7 +74,7 @@ func (this *UserBillService) ListUserBills(ctx context.Context, req *pb.ListUser
|
||||
}
|
||||
result := []*pb.UserBill{}
|
||||
for _, bill := range bills {
|
||||
user, err := models.SharedUserDAO.FindEnabledBasicUser(tx, int64(bill.UserId))
|
||||
user, err := models.SharedUserDAO.FindBasicUserWithoutState(tx, int64(bill.UserId))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -85,15 +85,17 @@ func (this *UserBillService) ListUserBills(ctx context.Context, req *pb.ListUser
|
||||
result = append(result, &pb.UserBill{
|
||||
Id: int64(bill.Id),
|
||||
User: &pb.User{
|
||||
Id: int64(bill.UserId),
|
||||
Fullname: user.Fullname,
|
||||
Username: user.Username,
|
||||
Id: int64(bill.UserId),
|
||||
Fullname: user.Fullname,
|
||||
Username: user.Username,
|
||||
IsDeleted: user.State == models.UserStateDisabled,
|
||||
},
|
||||
Type: bill.Type,
|
||||
TypeName: models.SharedUserBillDAO.BillTypeName(bill.Type),
|
||||
Description: bill.Description,
|
||||
Amount: float32(bill.Amount),
|
||||
Month: bill.Month,
|
||||
CanPay: bill.CanPay == 1,
|
||||
IsPaid: bill.IsPaid == 1,
|
||||
PaidAt: int64(bill.PaidAt),
|
||||
Code: bill.Code,
|
||||
@@ -151,6 +153,7 @@ func (this *UserBillService) FindUserBill(ctx context.Context, req *pb.FindUserB
|
||||
Description: bill.Description,
|
||||
Amount: float32(bill.Amount),
|
||||
Month: bill.Month,
|
||||
CanPay: bill.CanPay == 1,
|
||||
IsPaid: bill.IsPaid == 1,
|
||||
PaidAt: int64(bill.PaidAt),
|
||||
Code: bill.Code,
|
||||
@@ -194,6 +197,10 @@ func (this *UserBillService) PayUserBill(ctx context.Context, req *pb.PayUserBil
|
||||
return models.SharedUserBillDAO.UpdateUserBillIsPaid(tx, req.UserBillId, true)
|
||||
}
|
||||
|
||||
if bill.CanPay == 0 {
|
||||
return errors.New("can not pay now")
|
||||
}
|
||||
|
||||
// 余额是否足够
|
||||
account, err := accounts.SharedUserAccountDAO.FindUserAccountWithUserId(tx, userId)
|
||||
if err != nil {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
//go:build community
|
||||
// +build community
|
||||
//go:build !plus
|
||||
// +build !plus
|
||||
|
||||
package services
|
||||
|
||||
|
||||
@@ -167,7 +167,7 @@ func (this *Setup) Run() error {
|
||||
apiNodeId = nodeId
|
||||
}
|
||||
|
||||
apiNode, err := dao.FindEnabledAPINode(nil, apiNodeId)
|
||||
apiNode, err := dao.FindEnabledAPINode(nil, apiNodeId, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -117,15 +117,16 @@ func (this *SQLDump) Apply(db *dbs.DB, newResult *SQLDumpResult, showLog bool) (
|
||||
|
||||
// 新增表格
|
||||
for _, newTable := range newResult.Tables {
|
||||
oldTable := currentResult.FindTable(newTable.Name)
|
||||
var oldTable = currentResult.FindTable(newTable.Name)
|
||||
if oldTable == nil {
|
||||
ops = append(ops, "+ table "+newTable.Name)
|
||||
var op = "+ table " + newTable.Name
|
||||
ops = append(ops, op)
|
||||
if showLog {
|
||||
fmt.Println("+ table " + newTable.Name)
|
||||
fmt.Println(op)
|
||||
}
|
||||
_, err = db.Exec(newTable.Definition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.New("'" + op + "' failed: " + err.Error())
|
||||
}
|
||||
} else if oldTable.Definition != newTable.Definition {
|
||||
// 对比字段
|
||||
@@ -133,22 +134,24 @@ func (this *SQLDump) Apply(db *dbs.DB, newResult *SQLDumpResult, showLog bool) (
|
||||
for _, newField := range newTable.Fields {
|
||||
oldField := oldTable.FindField(newField.Name)
|
||||
if oldField == nil {
|
||||
ops = append(ops, "+ "+newTable.Name+" "+newField.Name)
|
||||
var op = "+ " + newTable.Name + " " + newField.Name
|
||||
ops = append(ops, op)
|
||||
if showLog {
|
||||
fmt.Println("+ " + newTable.Name + " " + newField.Name)
|
||||
fmt.Println(op)
|
||||
}
|
||||
_, err = db.Exec("ALTER TABLE " + newTable.Name + " ADD `" + newField.Name + "` " + newField.Definition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.New("'" + op + "' failed: " + err.Error())
|
||||
}
|
||||
} else if !newField.EqualDefinition(oldField.Definition) {
|
||||
ops = append(ops, "* "+newTable.Name+" "+newField.Name)
|
||||
var op = "* " + newTable.Name + " " + newField.Name
|
||||
ops = append(ops, op)
|
||||
if showLog {
|
||||
fmt.Println("* " + newTable.Name + " " + newField.Name)
|
||||
fmt.Println(op)
|
||||
}
|
||||
_, err = db.Exec("ALTER TABLE " + newTable.Name + " MODIFY `" + newField.Name + "` " + newField.Definition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.New("'" + op + "' failed: " + err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -158,31 +161,33 @@ func (this *SQLDump) Apply(db *dbs.DB, newResult *SQLDumpResult, showLog bool) (
|
||||
for _, newIndex := range newTable.Indexes {
|
||||
oldIndex := oldTable.FindIndex(newIndex.Name)
|
||||
if oldIndex == nil {
|
||||
ops = append(ops, "+ index "+newTable.Name+" "+newIndex.Name)
|
||||
var op = "+ index " + newTable.Name + " " + newIndex.Name
|
||||
ops = append(ops, op)
|
||||
if showLog {
|
||||
fmt.Println("+ index " + newTable.Name + " " + newIndex.Name)
|
||||
fmt.Println(op)
|
||||
}
|
||||
_, err = db.Exec("ALTER TABLE " + newTable.Name + " ADD " + newIndex.Definition)
|
||||
if err != nil {
|
||||
err = this.tryCreateIndex(err, db, newTable.Name, newIndex.Definition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.New("'" + op + "' failed: " + err.Error())
|
||||
}
|
||||
}
|
||||
} else if oldIndex.Definition != newIndex.Definition {
|
||||
ops = append(ops, "* index "+newTable.Name+" "+newIndex.Name)
|
||||
var op = "* index " + newTable.Name + " " + newIndex.Name
|
||||
ops = append(ops, op)
|
||||
if showLog {
|
||||
fmt.Println("* index " + newTable.Name + " " + newIndex.Name)
|
||||
fmt.Println(op)
|
||||
}
|
||||
_, err = db.Exec("ALTER TABLE " + newTable.Name + " DROP KEY " + newIndex.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.New("'" + op + "' drop old key failed: " + err.Error())
|
||||
}
|
||||
_, err = db.Exec("ALTER TABLE " + newTable.Name + " ADD " + newIndex.Definition)
|
||||
if err != nil {
|
||||
err = this.tryCreateIndex(err, db, newTable.Name, newIndex.Definition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.New("'" + op + "' failed: " + err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -192,13 +197,14 @@ func (this *SQLDump) Apply(db *dbs.DB, newResult *SQLDumpResult, showLog bool) (
|
||||
for _, oldIndex := range oldTable.Indexes {
|
||||
newIndex := newTable.FindIndex(oldIndex.Name)
|
||||
if newIndex == nil {
|
||||
ops = append(ops, "- index "+oldTable.Name+" "+oldIndex.Name)
|
||||
var op = "- index " + oldTable.Name + " " + oldIndex.Name
|
||||
ops = append(ops, op)
|
||||
if showLog {
|
||||
fmt.Println("- index " + oldTable.Name + " " + oldIndex.Name)
|
||||
fmt.Println(op)
|
||||
}
|
||||
_, err = db.Exec("ALTER TABLE " + oldTable.Name + " DROP KEY " + oldIndex.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.New("'" + op + "' failed: " + err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -208,13 +214,14 @@ func (this *SQLDump) Apply(db *dbs.DB, newResult *SQLDumpResult, showLog bool) (
|
||||
for _, oldField := range oldTable.Fields {
|
||||
newField := newTable.FindField(oldField.Name)
|
||||
if newField == nil {
|
||||
ops = append(ops, "- field "+oldTable.Name+" "+oldField.Name)
|
||||
var op = "- field " + oldTable.Name + " " + oldField.Name
|
||||
ops = append(ops, op)
|
||||
if showLog {
|
||||
fmt.Println("- field " + oldTable.Name + " " + oldField.Name)
|
||||
fmt.Println(op)
|
||||
}
|
||||
_, err = db.Exec("ALTER TABLE " + oldTable.Name + " DROP COLUMN `" + oldField.Name + "`")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, errors.New("'" + op + "' failed: " + err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"encoding/json"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/acme"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/db/models/stats"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/errors"
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/utils"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/dnsconfigs"
|
||||
@@ -65,6 +66,9 @@ var upgradeFuncs = []*upgradeVersion{
|
||||
{
|
||||
"0.4.0", upgradeV0_4_0,
|
||||
},
|
||||
{
|
||||
"0.4.1", upgradeV0_4_1,
|
||||
},
|
||||
}
|
||||
|
||||
// UpgradeSQLData 升级SQL数据
|
||||
@@ -569,3 +573,20 @@ func upgradeV0_4_0(db *dbs.DB) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// v0.4.1
|
||||
func upgradeV0_4_1(db *dbs.DB) error {
|
||||
// 升级 servers.lastUserPlanId
|
||||
_, err := db.Exec("UPDATE edgeServers SET lastUserPlanId=userPlanId WHERE userPlanId>0")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 执行域名统计清理
|
||||
err = stats.NewServerDomainHourlyStatDAO().Clean(nil, 7)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -85,7 +85,6 @@ func TestUpgradeSQLData_v0_3_7(t *testing.T) {
|
||||
t.Log("ok")
|
||||
}
|
||||
|
||||
|
||||
func TestUpgradeSQLData_v0_4_0(t *testing.T) {
|
||||
db, err := dbs.NewInstanceFromConfig(&dbs.DBConfig{
|
||||
Driver: "mysql",
|
||||
@@ -101,3 +100,19 @@ func TestUpgradeSQLData_v0_4_0(t *testing.T) {
|
||||
}
|
||||
t.Log("ok")
|
||||
}
|
||||
|
||||
func TestUpgradeSQLData_v0_4_1(t *testing.T) {
|
||||
db, err := dbs.NewInstanceFromConfig(&dbs.DBConfig{
|
||||
Driver: "mysql",
|
||||
Dsn: "root:123456@tcp(127.0.0.1:3306)/db_edge?charset=utf8mb4&timeout=30s",
|
||||
Prefix: "edge",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = upgradeV0_4_1(db)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log("ok")
|
||||
}
|
||||
|
||||
@@ -8,13 +8,15 @@ import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/systemconfigs"
|
||||
"github.com/iwind/TeaGo/dbs"
|
||||
"github.com/iwind/TeaGo/logs"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
func init() {
|
||||
dbs.OnReadyDone(func() {
|
||||
task := NewNodeMonitorTask(60)
|
||||
ticker := time.NewTicker(60 * time.Second)
|
||||
var task = NewNodeMonitorTask(60)
|
||||
var ticker = time.NewTicker(60 * time.Second)
|
||||
goman.New(func() {
|
||||
for range ticker.C {
|
||||
err := task.loop()
|
||||
@@ -29,11 +31,16 @@ func init() {
|
||||
// NodeMonitorTask 边缘节点监控任务
|
||||
type NodeMonitorTask struct {
|
||||
intervalSeconds int
|
||||
|
||||
inactiveMap map[string]int // cluster@nodeId => count
|
||||
notifiedMap map[int64]int64 // nodeId => timestamp
|
||||
}
|
||||
|
||||
func NewNodeMonitorTask(intervalSeconds int) *NodeMonitorTask {
|
||||
return &NodeMonitorTask{
|
||||
intervalSeconds: intervalSeconds,
|
||||
inactiveMap: map[string]int{},
|
||||
notifiedMap: map[int64]int64{},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -81,24 +88,44 @@ func (this *NodeMonitorTask) monitorCluster(cluster *models.NodeCluster) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, node := range inactiveNodes {
|
||||
subject := "节点\"" + node.Name + "\"已处于离线状态"
|
||||
msg := "节点\"" + node.Name + "\"已处于离线状态"
|
||||
err = models.SharedMessageDAO.CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, clusterId, int64(node.Id), models.MessageTypeNodeInactive, models.LevelError, subject, msg, nil, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 修改在线状态
|
||||
err = models.SharedNodeDAO.UpdateNodeActive(nil, int64(node.Id), false)
|
||||
if err != nil {
|
||||
return err
|
||||
var nodeMap = map[int64]*models.Node{}
|
||||
for _, node := range inactiveNodes {
|
||||
var nodeId = int64(node.Id)
|
||||
nodeMap[nodeId] = node
|
||||
this.inactiveMap[types.String(clusterId)+"@"+types.String(nodeId)]++
|
||||
}
|
||||
|
||||
const maxInactiveTries = 5
|
||||
|
||||
// 处理现有的离线状态
|
||||
for key, count := range this.inactiveMap {
|
||||
var pieces = strings.Split(key, "@")
|
||||
if pieces[0] != types.String(clusterId) {
|
||||
continue
|
||||
}
|
||||
var nodeId = types.Int64(pieces[1])
|
||||
node, ok := nodeMap[nodeId]
|
||||
if ok {
|
||||
// 连续 N 次离线发送通知
|
||||
// 同时也要确保两次发送通知的时间不会过近
|
||||
if count >= maxInactiveTries && time.Now().Unix()-this.notifiedMap[nodeId] > 3600 {
|
||||
this.inactiveMap[key] = 0
|
||||
this.notifiedMap[nodeId] = time.Now().Unix()
|
||||
|
||||
subject := "节点\"" + node.Name + "\"已处于离线状态"
|
||||
msg := "集群'" + cluster.Name + "'节点\"" + node.Name + "\"已处于离线状态,请检查节点是否异常"
|
||||
err = models.SharedMessageDAO.CreateNodeMessage(nil, nodeconfigs.NodeRoleNode, clusterId, int64(node.Id), models.MessageTypeNodeInactive, models.LevelError, subject, msg, nil, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
delete(this.inactiveMap, key)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO 检查恢复连接
|
||||
|
||||
// 检查CPU、内存、磁盘不足节点,而且离线的节点不再重复提示
|
||||
// 检查CPU、内存、磁盘不足节点
|
||||
// TODO 需要实现
|
||||
|
||||
return nil
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package tasks
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
|
||||
"github.com/iwind/TeaGo/dbs"
|
||||
"testing"
|
||||
)
|
||||
@@ -8,10 +9,23 @@ import (
|
||||
func TestNodeMonitorTask_loop(t *testing.T) {
|
||||
dbs.NotifyReady()
|
||||
|
||||
task := NewNodeMonitorTask(60)
|
||||
var task = NewNodeMonitorTask(60)
|
||||
err := task.loop()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log("ok")
|
||||
}
|
||||
|
||||
func TestNodeMonitorTask_Monitor(t *testing.T) {
|
||||
dbs.NotifyReady()
|
||||
var task = NewNodeMonitorTask(60)
|
||||
for i := 0; i < 5; i++ {
|
||||
err := task.monitorCluster(&models.NodeCluster{
|
||||
Id: 42,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user