Compare commits

...

11 Commits

Author SHA1 Message Date
刘祥超
529016d4d5 版本号更改为1.2.5 2023-07-26 15:30:37 +08:00
刘祥超
63942bfb08 将版本号修改为1.2.4 2023-07-26 10:19:02 +08:00
刘祥超
f4e4f32f9c 修复SysLocker无法写入新Key的问题 2023-07-26 10:18:52 +08:00
刘祥超
0a3c740502 版本号修改为1.2.3 2023-07-25 13:17:59 +08:00
刘祥超
9a3438e066 优化IP名单使用IP搜索查询速度 2023-07-25 12:26:12 +08:00
刘祥超
814b82e1b6 优化TOA相关代码 2023-07-24 15:33:44 +08:00
刘祥超
89cfd175cd 优化TOA相关API 2023-07-24 09:56:43 +08:00
刘祥超
860816719e 单个节点所在多个集群共用一个缓存策略时只加载其中一个 2023-07-20 16:54:34 +08:00
刘祥超
caa936f0ac 大幅提升SysLocker自增性能 2023-07-20 14:25:42 +08:00
刘祥超
97836a89eb 优化代码 2023-07-19 18:49:23 +08:00
刘祥超
84483dce61 版本号更改为1.2.2 2023-07-18 14:33:53 +08:00
11 changed files with 248 additions and 34 deletions

View File

@@ -1,7 +1,7 @@
package teaconst
const (
Version = "1.2.1"
Version = "1.2.5"
ProductName = "Edge API"
ProcessName = "edge-api"
@@ -18,7 +18,7 @@ const (
// 其他节点版本号,用来检测是否有需要升级的节点
NodeVersion = "1.2.1"
NodeVersion = "1.2.5"
// SQLVersion SQL版本号
SQLVersion = "11"

View File

@@ -14,6 +14,7 @@ import (
"github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/types"
"math"
"net"
"time"
)
@@ -498,7 +499,11 @@ func (this *IPItemDAO) CountAllEnabledIPItems(tx *dbs.Tx, sourceUserId int64, ke
}
}
if len(keyword) > 0 {
query.Like("ipFrom", dbutils.QuoteLike(keyword))
if net.ParseIP(keyword) != nil { // 是一个IP地址
query.Attr("ipFrom", keyword)
} else {
query.Like("ipFrom", dbutils.QuoteLike(keyword))
}
}
if len(ip) > 0 {
query.Attr("ipFrom", ip)
@@ -540,7 +545,11 @@ func (this *IPItemDAO) ListAllEnabledIPItems(tx *dbs.Tx, sourceUserId int64, key
}
}
if len(keyword) > 0 {
query.Like("ipFrom", dbutils.QuoteLike(keyword))
if net.ParseIP(keyword) != nil { // 是一个IP地址
query.Attr("ipFrom", keyword)
} else {
query.Like("ipFrom", dbutils.QuoteLike(keyword))
}
}
if len(ip) > 0 {
query.Attr("ipFrom", ip)

View File

@@ -647,10 +647,10 @@ func (this *NodeClusterDAO) FindClusterTOAConfig(tx *dbs.Tx, clusterId int64, ca
return nil, err
}
if !IsNotNull([]byte(toa)) {
return nodeconfigs.DefaultTOAConfig(), nil
return nodeconfigs.NewTOAConfig(), nil
}
config := &nodeconfigs.TOAConfig{}
var config = nodeconfigs.NewTOAConfig()
err = json.Unmarshal([]byte(toa), config)
if err != nil {
return nil, err
@@ -675,7 +675,7 @@ func (this *NodeClusterDAO) UpdateClusterTOA(tx *dbs.Tx, clusterId int64, toaJSO
if err != nil {
return err
}
return this.NotifyUpdate(tx, clusterId)
return this.NotifyTOAUpdate(tx, clusterId)
}
// CountAllEnabledNodeClustersWithHTTPCachePolicyId 计算使用某个缓存策略的集群数量
@@ -1454,12 +1454,13 @@ func (this *NodeClusterDAO) NotifyHTTPPagesPolicyUpdate(tx *dbs.Tx, clusterId in
return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, 0, 0, NodeTaskTypeHTTPPagesPolicyChanged)
}
// NotifyTOAUpdate 通知TOA变化
func (this *NodeClusterDAO) NotifyTOAUpdate(tx *dbs.Tx, clusterId int64) error {
return SharedNodeTaskDAO.CreateClusterTask(tx, nodeconfigs.NodeRoleNode, clusterId, 0, 0, NodeTaskTypeTOAChanged)
}
// NotifyDNSUpdate 通知DNS更新
// TODO 更新新的DNS解析记录的同时需要删除老的DNS解析记录
func (this *NodeClusterDAO) NotifyDNSUpdate(tx *dbs.Tx, clusterId int64) error {
err := dns.SharedDNSTaskDAO.CreateClusterTask(tx, clusterId, dns.DNSTaskTypeClusterChange)
if err != nil {
return err
}
return nil
return dns.SharedDNSTaskDAO.CreateClusterTask(tx, clusterId, dns.DNSTaskTypeClusterChange)
}

View File

@@ -1089,6 +1089,9 @@ func (this *NodeDAO) ComposeNodeConfig(tx *dbs.Tx, nodeId int64, dataMap *shared
config.HTTPCCPolicies = map[int64]*nodeconfigs.HTTPCCPolicy{}
config.HTTP3Policies = map[int64]*nodeconfigs.HTTP3Policy{}
config.HTTPPagesPolicies = map[int64]*nodeconfigs.HTTPPagesPolicy{}
var cachePolicyIds = []int64{}
var allowIPMaps = map[string]bool{}
for _, clusterId := range clusterIds {
nodeCluster, err := SharedNodeClusterDAO.FindClusterBasicInfo(tx, clusterId, cacheMap)
@@ -1128,12 +1131,15 @@ func (this *NodeDAO) ComposeNodeConfig(tx *dbs.Tx, nodeId int64, dataMap *shared
// 缓存策略
var httpCachePolicyId = int64(nodeCluster.CachePolicyId)
if httpCachePolicyId > 0 {
cachePolicy, err := SharedHTTPCachePolicyDAO.ComposeCachePolicy(tx, httpCachePolicyId, cacheMap)
if err != nil {
return nil, err
}
if cachePolicy != nil {
config.HTTPCachePolicies = append(config.HTTPCachePolicies, cachePolicy)
if !lists.ContainsInt64(cachePolicyIds, httpCachePolicyId) {
cachePolicyIds = append(cachePolicyIds, httpCachePolicyId)
cachePolicy, err := SharedHTTPCachePolicyDAO.ComposeCachePolicy(tx, httpCachePolicyId, cacheMap)
if err != nil {
return nil, err
}
if cachePolicy != nil {
config.HTTPCachePolicies = append(config.HTTPCachePolicies, cachePolicy)
}
}
}

View File

@@ -29,6 +29,7 @@ const (
NodeTaskTypeHTTPCCPolicyChanged NodeTaskType = "httpCCPolicyChanged" // CC策略变化
NodeTaskTypeHTTP3PolicyChanged NodeTaskType = "http3PolicyChanged" // HTTP3策略变化
NodeTaskTypeUpdatingServers NodeTaskType = "updatingServers" // 更新一组服务
NodeTaskTypeTOAChanged NodeTaskType = "toaChanged" // TOA配置变化
// NS相关
@@ -234,7 +235,7 @@ func (this *NodeTaskDAO) DeleteNodeTasks(tx *dbs.Tx, role string, nodeId int64)
}
// DeleteAllNodeTasks 删除所有节点相关任务
func (this *NodeTaskDAO)DeleteAllNodeTasks(tx *dbs.Tx) error {
func (this *NodeTaskDAO) DeleteAllNodeTasks(tx *dbs.Tx) error {
return this.Query(tx).
DeleteQuickly()
}

View File

@@ -2,7 +2,6 @@ package models
import (
"errors"
"github.com/TeaOSLab/EdgeAPI/internal/zero"
_ "github.com/go-sql-driver/mysql"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/dbs"
@@ -14,10 +13,6 @@ import (
type SysLockerDAO dbs.DAO
// concurrent transactions control
// 考虑到存在多个API节点的可能性容量不能太大也不能使用mutex
var sysLockerConcurrentLimiter = make(chan zero.Zero, 8)
func NewSysLockerDAO() *SysLockerDAO {
return dbs.NewDAO(&SysLockerDAO{
DAOObject: dbs.DAOObject{
@@ -71,7 +66,7 @@ func (this *SysLockerDAO) Lock(tx *dbs.Tx, key string, timeout int64) (ok bool,
}
// 如果已经有锁
locker := one.(*SysLocker)
var locker = one.(*SysLocker)
if time.Now().Unix() <= int64(locker.TimeoutAt) {
return false, nil
}
@@ -102,7 +97,7 @@ func (this *SysLockerDAO) Lock(tx *dbs.Tx, key string, timeout int64) (ok bool,
}
continue
}
if types.Int64(version) != int64(locker.Version)+1 {
if types.Int64(version) > int64(locker.Version)+1 {
return false, nil
}
@@ -119,6 +114,10 @@ func (this *SysLockerDAO) Unlock(tx *dbs.Tx, key string) error {
return err
}
const sysLockerStep = 8
var increment = NewSysLockerIncrement(sysLockerStep)
// Increase 增加版本号
func (this *SysLockerDAO) Increase(tx *dbs.Tx, key string, defaultValue int64) (int64, error) {
// validate key
@@ -130,10 +129,22 @@ func (this *SysLockerDAO) Increase(tx *dbs.Tx, key string, defaultValue int64) (
var result int64
var err error
sysLockerConcurrentLimiter <- zero.Zero{} // push
defer func() {
<-sysLockerConcurrentLimiter // pop
}()
{
colValue, err := this.Query(tx).
Result("version").
Attr("key", key).
FindInt64Col(0)
if err != nil {
return 0, err
}
var lastVersion = types.Int64(colValue)
if lastVersion <= increment.MaxValue(key) {
value, ok := increment.Pop(key)
if ok {
return value, nil
}
}
}
err = this.Instance.RunTx(func(tx *dbs.Tx) error {
result, err = this.Increase(tx, key, defaultValue)
@@ -146,7 +157,7 @@ func (this *SysLockerDAO) Increase(tx *dbs.Tx, key string, defaultValue int64) (
}
// combine statements to make increasing faster
colValue, err := tx.FindCol(0, "INSERT INTO `"+this.Table+"` (`key`, `version`) VALUES ('"+key+"', "+types.String(defaultValue)+") ON DUPLICATE KEY UPDATE `version`=`version`+1; SELECT `version` FROM `"+this.Table+"` WHERE `key`='"+key+"'")
colValue, err := tx.FindCol(0, "INSERT INTO `"+this.Table+"` (`key`, `version`) VALUES ('"+key+"', "+types.String(defaultValue+sysLockerStep)+") ON DUPLICATE KEY UPDATE `version`=`version`+"+types.String(sysLockerStep)+"; SELECT `version` FROM `"+this.Table+"` WHERE `key`='"+key+"'")
if err != nil {
if CheckSQLErrCode(err, 1064 /** syntax error **/) {
// continue to use seperated query
@@ -155,7 +166,11 @@ func (this *SysLockerDAO) Increase(tx *dbs.Tx, key string, defaultValue int64) (
return 0, err
}
} else {
return types.Int64(colValue), nil
var maxVersion = types.Int64(colValue)
var minVersion = maxVersion - sysLockerStep + 1
increment.Push(key, minVersion+1, maxVersion)
return minVersion, nil
}
err = this.Query(tx).

View File

@@ -43,12 +43,54 @@ func TestSysLocker_Increase_SQL(t *testing.T) {
t.Log("after:", v)
}
func TestSysLocker_Increase_New_Key(t *testing.T) {
var key = "KEY" + types.String(time.Now().Unix())
var dao = NewSysLockerDAO()
value, err := dao.Read(nil, key)
if err != nil {
t.Fatal(err)
}
t.Log("before:", value)
for i := 0; i < 2; i++ {
v, err := dao.Increase(nil, key, 0)
if err != nil {
t.Log("err:", err)
return
}
t.Log("after:", v)
}
}
func TestSysLocker_Increase_Cache(t *testing.T) {
var dao = NewSysLockerDAO()
for i := 0; i < 11; i++ {
v, err := dao.Increase(nil, "hello", 0)
if err != nil {
t.Log("err:", err)
return
}
t.Log("hello", i, "after:", v)
}
for i := 0; i < 11; i++ {
v, err := dao.Increase(nil, "hello2", 0)
if err != nil {
t.Log("err:", err)
return
}
t.Log("hello2", i, "after:", v)
}
}
func TestSysLocker_Increase(t *testing.T) {
dbs.NotifyReady()
var dao = NewSysLockerDAO()
dao.Instance.Raw().SetMaxOpenConns(64)
var count = 1000
var dao = NewSysLockerDAO()
value, err := dao.Read(nil, "hello")
if err != nil {
t.Fatal(err)
@@ -133,6 +175,8 @@ func TestSysLocker_Increase_Performance(t *testing.T) {
func BenchmarkSysLockerDAO_Increase(b *testing.B) {
var dao = NewSysLockerDAO()
dao.Instance.Raw().SetMaxOpenConns(64)
_, _ = dao.Increase(nil, "hello", 0)
b.ResetTimer()

View File

@@ -0,0 +1,110 @@
// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package models
import (
"sync"
)
type SysLockerIncrementItem struct {
size int
c chan int64
maxValue int64
}
func NewSysLockerIncrementItem(size int) *SysLockerIncrementItem {
if size <= 0 {
size = 10
}
return &SysLockerIncrementItem{
size: size,
c: make(chan int64, size),
}
}
func (this *SysLockerIncrementItem) Pop() (result int64, ok bool) {
select {
case v := <-this.c:
result = v
ok = true
return
default:
return
}
}
func (this *SysLockerIncrementItem) Push(value int64) {
if this.maxValue < value {
this.maxValue = value
}
select {
case this.c <- value:
default:
}
}
func (this *SysLockerIncrementItem) Reset() {
close(this.c)
this.c = make(chan int64, this.size)
}
func (this *SysLockerIncrementItem) MaxValue() int64 {
return this.maxValue
}
type SysLockerIncrement struct {
itemMap map[string]*SysLockerIncrementItem // key => item
size int
locker sync.RWMutex
}
func NewSysLockerIncrement(size int) *SysLockerIncrement {
if size <= 0 {
size = 10
}
return &SysLockerIncrement{
itemMap: map[string]*SysLockerIncrementItem{},
size: size,
}
}
func (this *SysLockerIncrement) Pop(key string) (result int64, ok bool) {
this.locker.Lock()
defer this.locker.Unlock()
item, itemOk := this.itemMap[key]
if itemOk {
result, ok = item.Pop()
}
return
}
func (this *SysLockerIncrement) Push(key string, minValue int64, maxValue int64) {
this.locker.Lock()
defer this.locker.Unlock()
item, itemOk := this.itemMap[key]
if itemOk {
item.Reset()
} else {
item = NewSysLockerIncrementItem(this.size)
this.itemMap[key] = item
}
for i := minValue; i <= maxValue; i++ {
item.Push(i)
}
}
func (this *SysLockerIncrement) MaxValue(key string) int64 {
this.locker.RLock()
defer this.locker.RUnlock()
item, itemOk := this.itemMap[key]
if itemOk {
return item.MaxValue()
}
return 0
}

View File

@@ -0,0 +1,23 @@
// Copyright 2023 GoEdge CDN goedge.cdn@gmail.com. All rights reserved. Official site: https://goedge.cn .
package models_test
import (
"github.com/TeaOSLab/EdgeAPI/internal/db/models"
"testing"
)
func TestNewSysLockerIncrement(t *testing.T) {
var increment = models.NewSysLockerIncrement(10)
increment.Push("key", 1, 10)
t.Log(increment.MaxValue("key"))
for i := 0; i < 11; i++ {
result, value := increment.Pop("key")
t.Log(i, "=>", result, value)
}
for i := 0; i < 11; i++ {
result, value := increment.Pop("key1")
t.Log(i, "=>", result, value)
}
}

View File

@@ -1102,7 +1102,7 @@ func (this *NodeClusterService) FindEnabledNodeClusterConfigInfo(ctx context.Con
// toa
if models.IsNotNull(cluster.Toa) {
var toaConfig = &nodeconfigs.TOAConfig{}
var toaConfig = nodeconfigs.NewTOAConfig()
err = json.Unmarshal(cluster.Toa, toaConfig)
if err != nil {
return nil, err

View File

@@ -47,3 +47,8 @@ func (this *NodeService) CopyNodeActionsToNodeGroup(ctx context.Context, req *pb
func (this *NodeService) CopyNodeActionsToNodeCluster(ctx context.Context, req *pb.CopyNodeActionsToNodeClusterRequest) (*pb.RPCSuccess, error) {
return nil, this.NotImplementedYet()
}
// FindNodeTOAConfig 查找节点的TOA配置
func (this *NodeService) FindNodeTOAConfig(ctx context.Context, req *pb.FindNodeTOAConfigRequest) (*pb.FindNodeTOAConfigResponse, error) {
return nil, this.NotImplementedYet()
}