Compare commits
27 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
496ee6cfa0 | ||
|
|
437914a321 | ||
|
|
3a93bf756a | ||
|
|
38d81f340e | ||
|
|
889b9d063a | ||
|
|
4c73b3618f | ||
|
|
df5f50682a | ||
|
|
9545bf69db | ||
|
|
b2f18c22ee | ||
|
|
63e3b7ac2f | ||
|
|
760a62c286 | ||
|
|
296848a6d6 | ||
|
|
4e04534244 | ||
|
|
cad43e610d | ||
|
|
b21bb8ee62 | ||
|
|
8caa03175c | ||
|
|
d9d06b7be9 | ||
|
|
1192f15676 | ||
|
|
ebf4d41290 | ||
|
|
a9ec78afb4 | ||
|
|
4526633027 | ||
|
|
c7ddd0adda | ||
|
|
ca07a6141b | ||
|
|
6d0f90747e | ||
|
|
bce8fd5ea3 | ||
|
|
d1fcbb46a3 | ||
|
|
d24f53477a |
@@ -6,3 +6,4 @@
|
||||
./build.sh linux mips64
|
||||
./build.sh linux mips64le
|
||||
./build.sh darwin amd64
|
||||
./build.sh darwin arm64
|
||||
@@ -33,6 +33,7 @@ function build() {
|
||||
mkdir $DIST/bin
|
||||
mkdir $DIST/configs
|
||||
mkdir $DIST/logs
|
||||
mkdir $DIST/data
|
||||
fi
|
||||
|
||||
cp $ROOT/configs/api.template.yaml $DIST/configs
|
||||
|
||||
1
build/data/.gitignore
vendored
Normal file
1
build/data/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
index.*
|
||||
3
go.mod
3
go.mod
@@ -13,11 +13,14 @@ require (
|
||||
github.com/go-yaml/yaml v2.1.0+incompatible
|
||||
github.com/golang/protobuf v1.4.2
|
||||
github.com/iwind/TeaGo v0.0.0-20201020081413-7cf62d6f420f
|
||||
github.com/iwind/gofcgi v0.0.0-20210506081859-17498ab3e9d7
|
||||
github.com/lionsoul2014/ip2region v2.2.0-release+incompatible
|
||||
github.com/mattn/go-sqlite3 v1.14.7
|
||||
github.com/mssola/user_agent v0.5.2
|
||||
github.com/shirou/gopsutil v2.20.9+incompatible
|
||||
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7
|
||||
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299
|
||||
golang.org/x/text v0.3.2
|
||||
google.golang.org/grpc v1.32.0
|
||||
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776
|
||||
)
|
||||
|
||||
4
go.sum
4
go.sum
@@ -58,6 +58,8 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
|
||||
github.com/iwind/TeaGo v0.0.0-20200923021120-f5d76441fe9e/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc=
|
||||
github.com/iwind/TeaGo v0.0.0-20201020081413-7cf62d6f420f h1:6Ws2H+eorfVUoMO2jta6A9nIdh8oi5/5LXo/LkAxR+E=
|
||||
github.com/iwind/TeaGo v0.0.0-20201020081413-7cf62d6f420f/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc=
|
||||
github.com/iwind/gofcgi v0.0.0-20210506081859-17498ab3e9d7 h1:apv23QzWNmv0D76gB3+u/5kf0F/Yw4W8h489CWUZtss=
|
||||
github.com/iwind/gofcgi v0.0.0-20210506081859-17498ab3e9d7/go.mod h1:JtbX20untAjUVjZs1ZBtq80f5rJWvwtQNRL6EnuYRnY=
|
||||
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
@@ -65,6 +67,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/lionsoul2014/ip2region v2.2.0-release+incompatible h1:1qp9iks+69h7IGLazAplzS9Ca14HAxuD5c0rbFdPGy4=
|
||||
github.com/lionsoul2014/ip2region v2.2.0-release+incompatible/go.mod h1:+ZBN7PBoh5gG6/y0ZQ85vJDBe21WnfbRrQQwTfliJJI=
|
||||
github.com/mattn/go-sqlite3 v1.14.7 h1:fxWBnXkxfM6sRiuH3bqJ4CfzZojMOLVc0UTsTglEghA=
|
||||
github.com/mattn/go-sqlite3 v1.14.7/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
|
||||
|
||||
@@ -10,12 +10,12 @@ const (
|
||||
)
|
||||
|
||||
type Item struct {
|
||||
Type ItemType
|
||||
Key string
|
||||
ExpiredAt int64
|
||||
HeaderSize int64
|
||||
BodySize int64
|
||||
MetaSize int64
|
||||
Type ItemType `json:"type"`
|
||||
Key string `json:"key"`
|
||||
ExpiredAt int64 `json:"expiredAt"`
|
||||
HeaderSize int64 `json:"headerSize"`
|
||||
BodySize int64 `json:"bodySize"`
|
||||
MetaSize int64 `json:"metaSize"`
|
||||
}
|
||||
|
||||
func (this *Item) IsExpired() bool {
|
||||
@@ -23,7 +23,7 @@ func (this *Item) IsExpired() bool {
|
||||
}
|
||||
|
||||
func (this *Item) TotalSize() int64 {
|
||||
return this.Size() + this.MetaSize + int64(len(this.Key))
|
||||
return this.Size() + this.MetaSize + int64(len(this.Key)) + 64
|
||||
}
|
||||
|
||||
func (this *Item) Size() int64 {
|
||||
|
||||
@@ -1,145 +1,33 @@
|
||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package caches
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
type ListInterface interface {
|
||||
Init() error
|
||||
|
||||
// 缓存列表管理
|
||||
type List struct {
|
||||
m map[string]*Item // hash => item
|
||||
locker sync.RWMutex
|
||||
onAdd func(item *Item)
|
||||
onRemove func(item *Item)
|
||||
}
|
||||
|
||||
func NewList() *List {
|
||||
return &List{
|
||||
m: map[string]*Item{},
|
||||
}
|
||||
}
|
||||
|
||||
func (this *List) Reset() {
|
||||
this.locker.Lock()
|
||||
this.m = map[string]*Item{}
|
||||
this.locker.Unlock()
|
||||
}
|
||||
|
||||
func (this *List) Add(hash string, item *Item) {
|
||||
this.locker.Lock()
|
||||
if this.onAdd != nil {
|
||||
this.onAdd(item)
|
||||
}
|
||||
this.m[hash] = item
|
||||
this.locker.Unlock()
|
||||
}
|
||||
|
||||
func (this *List) Exist(hash string) bool {
|
||||
this.locker.RLock()
|
||||
defer this.locker.RUnlock()
|
||||
|
||||
item, ok := this.m[hash]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
return !item.IsExpired()
|
||||
}
|
||||
|
||||
// 根据前缀进行查找
|
||||
func (this *List) FindKeysWithPrefix(prefix string) (keys []string) {
|
||||
this.locker.RLock()
|
||||
defer this.locker.RUnlock()
|
||||
|
||||
// TODO 需要优化性能,支持千万级数据低于1s的处理速度
|
||||
for _, item := range this.m {
|
||||
if strings.HasPrefix(item.Key, prefix) {
|
||||
keys = append(keys, item.Key)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (this *List) Remove(hash string) {
|
||||
this.locker.Lock()
|
||||
|
||||
item, ok := this.m[hash]
|
||||
if ok {
|
||||
if this.onRemove != nil {
|
||||
this.onRemove(item)
|
||||
}
|
||||
delete(this.m, hash)
|
||||
}
|
||||
|
||||
this.locker.Unlock()
|
||||
}
|
||||
|
||||
// 清理过期的缓存
|
||||
// count 每次遍历的最大数量,控制此数字可以保证每次清理的时候不用花太多时间
|
||||
// callback 每次发现过期key的调用
|
||||
func (this *List) Purge(count int, callback func(hash string)) {
|
||||
this.locker.Lock()
|
||||
deletedHashList := []string{}
|
||||
for hash, item := range this.m {
|
||||
if count <= 0 {
|
||||
break
|
||||
}
|
||||
|
||||
if item.IsExpired() {
|
||||
if this.onRemove != nil {
|
||||
this.onRemove(item)
|
||||
}
|
||||
delete(this.m, hash)
|
||||
deletedHashList = append(deletedHashList, hash)
|
||||
}
|
||||
|
||||
count--
|
||||
}
|
||||
this.locker.Unlock()
|
||||
|
||||
// 执行外部操作
|
||||
for _, hash := range deletedHashList {
|
||||
if callback != nil {
|
||||
callback(hash)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (this *List) Stat(check func(hash string) bool) *Stat {
|
||||
this.locker.RLock()
|
||||
defer this.locker.RUnlock()
|
||||
|
||||
result := &Stat{
|
||||
Count: 0,
|
||||
Size: 0,
|
||||
}
|
||||
for hash, item := range this.m {
|
||||
if !item.IsExpired() {
|
||||
// 检查文件是否存在、内容是否正确等
|
||||
if check != nil && check(hash) {
|
||||
result.Count++
|
||||
result.ValueSize += item.Size()
|
||||
result.Size += item.TotalSize()
|
||||
}
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// 总数量
|
||||
func (this *List) Count() int64 {
|
||||
this.locker.RLock()
|
||||
count := int64(len(this.m))
|
||||
this.locker.RUnlock()
|
||||
return count
|
||||
}
|
||||
|
||||
// 添加事件
|
||||
func (this *List) OnAdd(f func(item *Item)) {
|
||||
this.onAdd = f
|
||||
}
|
||||
|
||||
// 删除事件
|
||||
func (this *List) OnRemove(f func(item *Item)) {
|
||||
this.onRemove = f
|
||||
Reset() error
|
||||
|
||||
Add(hash string, item *Item) error
|
||||
|
||||
Exist(hash string) (bool, error)
|
||||
|
||||
// FindKeysWithPrefix 根据前缀进行查找
|
||||
FindKeysWithPrefix(prefix string) (keys []string, err error)
|
||||
|
||||
Remove(hash string) error
|
||||
|
||||
Purge(count int, callback func(hash string) error) error
|
||||
|
||||
CleanAll() error
|
||||
|
||||
Stat(check func(hash string) bool) (*Stat, error)
|
||||
|
||||
// Count 总数量
|
||||
Count() (int64, error)
|
||||
|
||||
// OnAdd 添加事件
|
||||
OnAdd(f func(item *Item))
|
||||
|
||||
// OnRemove 删除事件
|
||||
OnRemove(f func(item *Item))
|
||||
}
|
||||
|
||||
260
internal/caches/list_file.go
Normal file
260
internal/caches/list_file.go
Normal file
@@ -0,0 +1,260 @@
|
||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package caches
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// FileList 文件缓存列表管理
|
||||
type FileList struct {
|
||||
dir string
|
||||
db *sql.DB
|
||||
total int64
|
||||
|
||||
onAdd func(item *Item)
|
||||
onRemove func(item *Item)
|
||||
}
|
||||
|
||||
func NewFileList(dir string) ListInterface {
|
||||
return &FileList{dir: dir}
|
||||
}
|
||||
|
||||
func (this *FileList) Init() error {
|
||||
db, err := sql.Open("sqlite3", "file:"+this.dir+"/index.db?cache=shared&mode=rwc")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
db.SetMaxOpenConns(1)
|
||||
|
||||
_, err = db.Exec("VACUUM")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 创建
|
||||
// TODO accessesAt 用来存储访问时间,将来可以根据此访问时间删除不常访问的内容
|
||||
// 且访问时间只需要每隔一个小时存储一个整数值即可,因为不需要那么精确
|
||||
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS "cacheItems" (
|
||||
"hash" varchar(32),
|
||||
"key" varchar(1024),
|
||||
"headerSize" integer DEFAULT 0,
|
||||
"bodySize" integer DEFAULT 0,
|
||||
"metaSize" integer DEFAULT 0,
|
||||
"expiredAt" integer DEFAULT 0,
|
||||
"accessedAt" integer DEFAULT 0
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS "hash"
|
||||
ON "cacheItems" (
|
||||
"hash"
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS "expiredAt"
|
||||
ON "cacheItems" (
|
||||
"expiredAt"
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS "accessedAt"
|
||||
ON "cacheItems" (
|
||||
"accessedAt"
|
||||
);
|
||||
`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
this.db = db
|
||||
|
||||
// 读取总数量
|
||||
row := this.db.QueryRow("SELECT COUNT(*) FROM cacheItems")
|
||||
if row.Err() != nil {
|
||||
return row.Err()
|
||||
}
|
||||
var total int64
|
||||
err = row.Scan(&total)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
this.total = total
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *FileList) Reset() error {
|
||||
// 不错任何事情
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *FileList) Add(hash string, item *Item) error {
|
||||
_, err := this.db.Exec(`INSERT INTO cacheItems ("hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt") VALUES (?, ?, ?, ?, ?, ?)`, hash, item.Key, item.HeaderSize, item.BodySize, item.MetaSize, item.ExpiredAt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
atomic.AddInt64(&this.total, 1)
|
||||
|
||||
if this.onAdd != nil {
|
||||
this.onAdd(item)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *FileList) Exist(hash string) (bool, error) {
|
||||
row := this.db.QueryRow(`SELECT "bodySize" FROM cacheItems WHERE "hash"=? LIMIT 1`, hash)
|
||||
if row == nil {
|
||||
return false, nil
|
||||
}
|
||||
if row.Err() != nil {
|
||||
return false, row.Err()
|
||||
}
|
||||
var bodySize int
|
||||
err := row.Scan(&bodySize)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// FindKeysWithPrefix 根据前缀进行查找
|
||||
func (this *FileList) FindKeysWithPrefix(prefix string) (keys []string, err error) {
|
||||
if len(prefix) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO 需要优化上千万结果的情况
|
||||
|
||||
rows, err := this.db.Query(`SELECT "key" FROM cacheItems WHERE INSTR("key", ?)==1 LIMIT 100000`, prefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
_ = rows.Close()
|
||||
}()
|
||||
|
||||
for rows.Next() {
|
||||
var key string
|
||||
err = rows.Scan(&key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
keys = append(keys, key)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (this *FileList) Remove(hash string) error {
|
||||
row := this.db.QueryRow(`SELECT "key", "headerSize", "bodySize", "metaSize", "expiredAt" FROM cacheItems WHERE "hash"=? LIMIT 1`, hash)
|
||||
if row.Err() != nil {
|
||||
return row.Err()
|
||||
}
|
||||
|
||||
var item = &Item{Type: ItemTypeFile}
|
||||
err := row.Scan(&item.Key, &item.HeaderSize, &item.BodySize, &item.MetaSize, &item.ExpiredAt)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = this.db.Exec(`DELETE FROM cacheItems WHERE "hash"=?`, hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
atomic.AddInt64(&this.total, -1)
|
||||
|
||||
if this.onRemove != nil {
|
||||
this.onRemove(item)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Purge 清理过期的缓存
|
||||
// count 每次遍历的最大数量,控制此数字可以保证每次清理的时候不用花太多时间
|
||||
// callback 每次发现过期key的调用
|
||||
func (this *FileList) Purge(count int, callback func(hash string) error) error {
|
||||
if count <= 0 {
|
||||
count = 1000
|
||||
}
|
||||
|
||||
rows, err := this.db.Query(`SELECT "hash" FROM cacheItems WHERE expiredAt<=? LIMIT ?`, time.Now().Unix(), count)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
_ = rows.Close()
|
||||
}()
|
||||
|
||||
hashStrings := []string{}
|
||||
for rows.Next() {
|
||||
var hash string
|
||||
err = rows.Scan(&hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
hashStrings = append(hashStrings, hash)
|
||||
}
|
||||
|
||||
// 不在 rows.Next() 循环中操作是为了避免死锁
|
||||
for _, hash := range hashStrings {
|
||||
err = this.Remove(hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = callback(hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *FileList) CleanAll() error {
|
||||
_, err := this.db.Exec("DELETE FROM cacheItems")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
atomic.StoreInt64(&this.total, 0)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *FileList) Stat(check func(hash string) bool) (*Stat, error) {
|
||||
// 这里不设置过期时间、不使用 check 函数,目的是让查询更快速一些
|
||||
row := this.db.QueryRow("SELECT COUNT(*), IFNULL(SUM(headerSize+bodySize+metaSize), 0), IFNULL(SUM(headerSize+bodySize), 0) FROM cacheItems")
|
||||
if row.Err() != nil {
|
||||
return nil, row.Err()
|
||||
}
|
||||
stat := &Stat{}
|
||||
err := row.Scan(&stat.Count, &stat.Size, &stat.ValueSize)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return stat, nil
|
||||
}
|
||||
|
||||
// Count 总数量
|
||||
// 常用的方法,所以避免直接查询数据库
|
||||
func (this *FileList) Count() (int64, error) {
|
||||
return atomic.LoadInt64(&this.total), nil
|
||||
}
|
||||
|
||||
// OnAdd 添加事件
|
||||
func (this *FileList) OnAdd(f func(item *Item)) {
|
||||
this.onAdd = f
|
||||
}
|
||||
|
||||
// OnRemove 删除事件
|
||||
func (this *FileList) OnRemove(f func(item *Item)) {
|
||||
this.onRemove = f
|
||||
}
|
||||
172
internal/caches/list_file_test.go
Normal file
172
internal/caches/list_file_test.go
Normal file
@@ -0,0 +1,172 @@
|
||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package caches
|
||||
|
||||
import (
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
stringutil "github.com/iwind/TeaGo/utils/string"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestFileList_Init(t *testing.T) {
|
||||
list := NewFileList(Tea.Root + "/data")
|
||||
err := list.Init()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log("ok")
|
||||
}
|
||||
|
||||
func TestFileList_Add(t *testing.T) {
|
||||
list := NewFileList(Tea.Root + "/data")
|
||||
err := list.Init()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = list.Add(stringutil.Md5("123456"), &Item{
|
||||
Key: "123456",
|
||||
ExpiredAt: time.Now().Unix(),
|
||||
HeaderSize: 1,
|
||||
MetaSize: 2,
|
||||
BodySize: 3,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log("ok")
|
||||
}
|
||||
|
||||
func TestFileList_Add_Many(t *testing.T) {
|
||||
list := NewFileList(Tea.Root + "/data")
|
||||
err := list.Init()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for i := 0; i < 100_0000; i++ {
|
||||
u := "http://edge.teaos.cn/123456" + strconv.Itoa(i)
|
||||
err = list.Add(stringutil.Md5(u), &Item{
|
||||
Key: u,
|
||||
ExpiredAt: time.Now().Unix(),
|
||||
HeaderSize: 1,
|
||||
MetaSize: 2,
|
||||
BodySize: 3,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
t.Log("ok")
|
||||
}
|
||||
|
||||
func TestFileList_Exist(t *testing.T) {
|
||||
list := NewFileList(Tea.Root + "/data")
|
||||
err := list.Init()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
{
|
||||
exists, err := list.Exist(stringutil.Md5("123456"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log("exists:", exists)
|
||||
}
|
||||
{
|
||||
exists, err := list.Exist(stringutil.Md5("654321"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log("exists:", exists)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFileList_FindKeysWithPrefix(t *testing.T) {
|
||||
list := NewFileList(Tea.Root + "/data")
|
||||
err := list.Init()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
before := time.Now()
|
||||
keys, err := list.FindKeysWithPrefix("1234")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log("keys:", keys)
|
||||
t.Log(time.Since(before).Seconds()*1000, "ms")
|
||||
}
|
||||
|
||||
func TestFileList_Remove(t *testing.T) {
|
||||
list := NewFileList(Tea.Root + "/data")
|
||||
err := list.Init()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
list.OnRemove(func(item *Item) {
|
||||
t.Logf("remove %#v", item)
|
||||
})
|
||||
err = list.Remove(stringutil.Md5("123456"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log("ok")
|
||||
}
|
||||
|
||||
func TestFileList_Purge(t *testing.T) {
|
||||
list := NewFileList(Tea.Root + "/data")
|
||||
err := list.Init()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = list.Purge(2, func(hash string) error {
|
||||
t.Log(hash)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log("ok")
|
||||
}
|
||||
|
||||
func TestFileList_Stat(t *testing.T) {
|
||||
list := NewFileList(Tea.Root + "/data")
|
||||
err := list.Init()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
stat, err := list.Stat(nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log("count:", stat.Count, "size:", stat.Size, "valueSize:", stat.ValueSize)
|
||||
}
|
||||
|
||||
func TestFileList_Count(t *testing.T) {
|
||||
list := NewFileList(Tea.Root + "/data")
|
||||
err := list.Init()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
before := time.Now()
|
||||
count, err := list.Count()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log("count:", count)
|
||||
t.Log(time.Since(before).Seconds()*1000, "ms")
|
||||
}
|
||||
|
||||
func TestFileList_CleanAll(t *testing.T) {
|
||||
list := NewFileList(Tea.Root + "/data")
|
||||
err := list.Init()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = list.CleanAll()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log("ok")
|
||||
t.Log(list.Count())
|
||||
}
|
||||
161
internal/caches/list_memory.go
Normal file
161
internal/caches/list_memory.go
Normal file
@@ -0,0 +1,161 @@
|
||||
package caches
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// MemoryList 内存缓存列表管理
|
||||
type MemoryList struct {
|
||||
m map[string]*Item // hash => item
|
||||
locker sync.RWMutex
|
||||
onAdd func(item *Item)
|
||||
onRemove func(item *Item)
|
||||
}
|
||||
|
||||
func NewMemoryList() ListInterface {
|
||||
return &MemoryList{
|
||||
m: map[string]*Item{},
|
||||
}
|
||||
}
|
||||
|
||||
func (this *MemoryList) Init() error {
|
||||
// 内存列表不需要初始化
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *MemoryList) Reset() error {
|
||||
this.locker.Lock()
|
||||
this.m = map[string]*Item{}
|
||||
this.locker.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *MemoryList) Add(hash string, item *Item) error {
|
||||
this.locker.Lock()
|
||||
if this.onAdd != nil {
|
||||
this.onAdd(item)
|
||||
}
|
||||
this.m[hash] = item
|
||||
this.locker.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *MemoryList) Exist(hash string) (bool, error) {
|
||||
this.locker.RLock()
|
||||
defer this.locker.RUnlock()
|
||||
|
||||
item, ok := this.m[hash]
|
||||
if !ok {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return !item.IsExpired(), nil
|
||||
}
|
||||
|
||||
// FindKeysWithPrefix 根据前缀进行查找
|
||||
func (this *MemoryList) FindKeysWithPrefix(prefix string) (keys []string, err error) {
|
||||
this.locker.RLock()
|
||||
defer this.locker.RUnlock()
|
||||
|
||||
// TODO 需要优化性能,支持千万级数据低于1s的处理速度
|
||||
for _, item := range this.m {
|
||||
if strings.HasPrefix(item.Key, prefix) {
|
||||
keys = append(keys, item.Key)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (this *MemoryList) Remove(hash string) error {
|
||||
this.locker.Lock()
|
||||
|
||||
item, ok := this.m[hash]
|
||||
if ok {
|
||||
if this.onRemove != nil {
|
||||
this.onRemove(item)
|
||||
}
|
||||
delete(this.m, hash)
|
||||
}
|
||||
|
||||
this.locker.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Purge 清理过期的缓存
|
||||
// count 每次遍历的最大数量,控制此数字可以保证每次清理的时候不用花太多时间
|
||||
// callback 每次发现过期key的调用
|
||||
func (this *MemoryList) Purge(count int, callback func(hash string) error) error {
|
||||
this.locker.Lock()
|
||||
deletedHashList := []string{}
|
||||
for hash, item := range this.m {
|
||||
if count <= 0 {
|
||||
break
|
||||
}
|
||||
|
||||
if item.IsExpired() {
|
||||
if this.onRemove != nil {
|
||||
this.onRemove(item)
|
||||
}
|
||||
delete(this.m, hash)
|
||||
deletedHashList = append(deletedHashList, hash)
|
||||
}
|
||||
|
||||
count--
|
||||
}
|
||||
this.locker.Unlock()
|
||||
|
||||
// 执行外部操作
|
||||
for _, hash := range deletedHashList {
|
||||
if callback != nil {
|
||||
err := callback(hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *MemoryList) CleanAll() error {
|
||||
return this.Reset()
|
||||
}
|
||||
|
||||
func (this *MemoryList) Stat(check func(hash string) bool) (*Stat, error) {
|
||||
this.locker.RLock()
|
||||
defer this.locker.RUnlock()
|
||||
|
||||
result := &Stat{
|
||||
Count: 0,
|
||||
Size: 0,
|
||||
}
|
||||
for hash, item := range this.m {
|
||||
if !item.IsExpired() {
|
||||
// 检查文件是否存在、内容是否正确等
|
||||
if check != nil && check(hash) {
|
||||
result.Count++
|
||||
result.ValueSize += item.Size()
|
||||
result.Size += item.TotalSize()
|
||||
}
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Count 总数量
|
||||
func (this *MemoryList) Count() (int64, error) {
|
||||
this.locker.RLock()
|
||||
count := int64(len(this.m))
|
||||
this.locker.RUnlock()
|
||||
return count, nil
|
||||
}
|
||||
|
||||
// OnAdd 添加事件
|
||||
func (this *MemoryList) OnAdd(f func(item *Item)) {
|
||||
this.onAdd = f
|
||||
}
|
||||
|
||||
// OnRemove 删除事件
|
||||
func (this *MemoryList) OnRemove(f func(item *Item)) {
|
||||
this.onRemove = f
|
||||
}
|
||||
@@ -10,13 +10,13 @@ import (
|
||||
)
|
||||
|
||||
func TestList_Add(t *testing.T) {
|
||||
list := NewList()
|
||||
list.Add("a", &Item{
|
||||
list := &MemoryList{}
|
||||
_ = list.Add("a", &Item{
|
||||
Key: "a1",
|
||||
ExpiredAt: time.Now().Unix() + 3600,
|
||||
HeaderSize: 1024,
|
||||
})
|
||||
list.Add("b", &Item{
|
||||
_ = list.Add("b", &Item{
|
||||
Key: "b1",
|
||||
ExpiredAt: time.Now().Unix() + 3600,
|
||||
HeaderSize: 1024,
|
||||
@@ -25,72 +25,73 @@ func TestList_Add(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestList_Remove(t *testing.T) {
|
||||
list := NewList()
|
||||
list.Add("a", &Item{
|
||||
list := &MemoryList{}
|
||||
_ = list.Add("a", &Item{
|
||||
Key: "a1",
|
||||
ExpiredAt: time.Now().Unix() + 3600,
|
||||
HeaderSize: 1024,
|
||||
})
|
||||
list.Add("b", &Item{
|
||||
_ = list.Add("b", &Item{
|
||||
Key: "b1",
|
||||
ExpiredAt: time.Now().Unix() + 3600,
|
||||
HeaderSize: 1024,
|
||||
})
|
||||
list.Remove("b")
|
||||
_ = list.Remove("b")
|
||||
t.Log(list.m)
|
||||
}
|
||||
|
||||
func TestList_Purge(t *testing.T) {
|
||||
list := NewList()
|
||||
list.Add("a", &Item{
|
||||
list := &MemoryList{}
|
||||
_ = list.Add("a", &Item{
|
||||
Key: "a1",
|
||||
ExpiredAt: time.Now().Unix() + 3600,
|
||||
HeaderSize: 1024,
|
||||
})
|
||||
list.Add("b", &Item{
|
||||
_ = list.Add("b", &Item{
|
||||
Key: "b1",
|
||||
ExpiredAt: time.Now().Unix() + 3600,
|
||||
HeaderSize: 1024,
|
||||
})
|
||||
list.Add("c", &Item{
|
||||
_ = list.Add("c", &Item{
|
||||
Key: "c1",
|
||||
ExpiredAt: time.Now().Unix() - 3600,
|
||||
HeaderSize: 1024,
|
||||
})
|
||||
list.Add("d", &Item{
|
||||
_ = list.Add("d", &Item{
|
||||
Key: "d1",
|
||||
ExpiredAt: time.Now().Unix() - 2,
|
||||
HeaderSize: 1024,
|
||||
})
|
||||
list.Purge(100, func(hash string) {
|
||||
_ = list.Purge(100, func(hash string) error {
|
||||
t.Log("delete:", hash)
|
||||
return nil
|
||||
})
|
||||
t.Log(list.m)
|
||||
}
|
||||
|
||||
func TestList_Stat(t *testing.T) {
|
||||
list := NewList()
|
||||
list.Add("a", &Item{
|
||||
list := &MemoryList{}
|
||||
_ = list.Add("a", &Item{
|
||||
Key: "a1",
|
||||
ExpiredAt: time.Now().Unix() + 3600,
|
||||
HeaderSize: 1024,
|
||||
})
|
||||
list.Add("b", &Item{
|
||||
_ = list.Add("b", &Item{
|
||||
Key: "b1",
|
||||
ExpiredAt: time.Now().Unix() + 3600,
|
||||
HeaderSize: 1024,
|
||||
})
|
||||
list.Add("c", &Item{
|
||||
_ = list.Add("c", &Item{
|
||||
Key: "c1",
|
||||
ExpiredAt: time.Now().Unix(),
|
||||
HeaderSize: 1024,
|
||||
})
|
||||
list.Add("d", &Item{
|
||||
_ = list.Add("d", &Item{
|
||||
Key: "d1",
|
||||
ExpiredAt: time.Now().Unix() - 2,
|
||||
HeaderSize: 1024,
|
||||
})
|
||||
result := list.Stat(func(hash string) bool {
|
||||
result, _ := list.Stat(func(hash string) bool {
|
||||
// 随机测试
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
return rand.Int()%2 == 0
|
||||
@@ -99,11 +100,11 @@ func TestList_Stat(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestList_FindKeysWithPrefix(t *testing.T) {
|
||||
list := NewList()
|
||||
list := &MemoryList{}
|
||||
before := time.Now()
|
||||
for i := 0; i < 1_000_000; i++ {
|
||||
key := "http://www.teaos.cn/hello" + strconv.Itoa(i/100000) + "/" + strconv.Itoa(i) + ".html"
|
||||
list.Add(fmt.Sprintf("%d", xxhash.Sum64String(key)), &Item{
|
||||
_ = list.Add(fmt.Sprintf("%d", xxhash.Sum64String(key)), &Item{
|
||||
Key: key,
|
||||
ExpiredAt: 0,
|
||||
BodySize: 0,
|
||||
@@ -113,7 +114,10 @@ func TestList_FindKeysWithPrefix(t *testing.T) {
|
||||
t.Log(time.Since(before).Seconds()*1000, "ms")
|
||||
|
||||
before = time.Now()
|
||||
keys := list.FindKeysWithPrefix("http://www.teaos.cn/hello/5000")
|
||||
keys, err := list.FindKeysWithPrefix("http://www.teaos.cn/hello/5000")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log(len(keys))
|
||||
t.Log(time.Since(before).Seconds()*1000, "ms")
|
||||
}
|
||||
@@ -2,6 +2,7 @@ package caches
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
"github.com/iwind/TeaGo/lists"
|
||||
"strconv"
|
||||
@@ -10,12 +11,18 @@ import (
|
||||
|
||||
var SharedManager = NewManager()
|
||||
|
||||
// Manager 缓存策略管理器
|
||||
type Manager struct {
|
||||
// 全局配置
|
||||
MaxDiskCapacity *shared.SizeCapacity
|
||||
MaxMemoryCapacity *shared.SizeCapacity
|
||||
|
||||
policyMap map[int64]*serverconfigs.HTTPCachePolicy // policyId => []*Policy
|
||||
storageMap map[int64]StorageInterface // policyId => *Storage
|
||||
locker sync.RWMutex
|
||||
}
|
||||
|
||||
// NewManager 获取管理器对象
|
||||
func NewManager() *Manager {
|
||||
return &Manager{
|
||||
policyMap: map[int64]*serverconfigs.HTTPCachePolicy{},
|
||||
@@ -23,7 +30,7 @@ func NewManager() *Manager {
|
||||
}
|
||||
}
|
||||
|
||||
// 重新设置策略
|
||||
// UpdatePolicies 重新设置策略
|
||||
func (this *Manager) UpdatePolicies(newPolicies []*serverconfigs.HTTPCachePolicy) {
|
||||
this.locker.Lock()
|
||||
defer this.locker.Unlock()
|
||||
@@ -103,7 +110,7 @@ func (this *Manager) UpdatePolicies(newPolicies []*serverconfigs.HTTPCachePolicy
|
||||
}
|
||||
}
|
||||
|
||||
// 获取Policy信息
|
||||
// FindPolicy 获取Policy信息
|
||||
func (this *Manager) FindPolicy(policyId int64) *serverconfigs.HTTPCachePolicy {
|
||||
this.locker.RLock()
|
||||
defer this.locker.RUnlock()
|
||||
@@ -112,7 +119,7 @@ func (this *Manager) FindPolicy(policyId int64) *serverconfigs.HTTPCachePolicy {
|
||||
return p
|
||||
}
|
||||
|
||||
// 根据策略ID查找存储
|
||||
// FindStorageWithPolicy 根据策略ID查找存储
|
||||
func (this *Manager) FindStorageWithPolicy(policyId int64) StorageInterface {
|
||||
this.locker.RLock()
|
||||
defer this.locker.RUnlock()
|
||||
@@ -121,7 +128,7 @@ func (this *Manager) FindStorageWithPolicy(policyId int64) StorageInterface {
|
||||
return storage
|
||||
}
|
||||
|
||||
// 根据策略获取存储对象
|
||||
// NewStorageWithPolicy 根据策略获取存储对象
|
||||
func (this *Manager) NewStorageWithPolicy(policy *serverconfigs.HTTPCachePolicy) StorageInterface {
|
||||
switch policy.Type {
|
||||
case serverconfigs.CachePolicyStorageFile:
|
||||
@@ -131,3 +138,27 @@ func (this *Manager) NewStorageWithPolicy(policy *serverconfigs.HTTPCachePolicy)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// TotalDiskSize 消耗的磁盘尺寸
|
||||
func (this *Manager) TotalDiskSize() int64 {
|
||||
this.locker.RLock()
|
||||
defer this.locker.RUnlock()
|
||||
|
||||
total := int64(0)
|
||||
for _, storage := range this.storageMap {
|
||||
total += storage.TotalDiskSize()
|
||||
}
|
||||
return total
|
||||
}
|
||||
|
||||
// TotalMemorySize 消耗的内存尺寸
|
||||
func (this *Manager) TotalMemorySize() int64 {
|
||||
this.locker.RLock()
|
||||
defer this.locker.RUnlock()
|
||||
|
||||
total := int64(0)
|
||||
for _, storage := range this.storageMap {
|
||||
total += storage.TotalMemorySize()
|
||||
}
|
||||
return total
|
||||
}
|
||||
|
||||
@@ -3,27 +3,30 @@ package caches
|
||||
type ReaderFunc func(n int) (goNext bool, err error)
|
||||
|
||||
type Reader interface {
|
||||
// 初始化
|
||||
// Init 初始化
|
||||
Init() error
|
||||
|
||||
// 状态码
|
||||
// TypeName 类型名称
|
||||
TypeName() string
|
||||
|
||||
// Status 状态码
|
||||
Status() int
|
||||
|
||||
// 读取Header
|
||||
// ReadHeader 读取Header
|
||||
ReadHeader(buf []byte, callback ReaderFunc) error
|
||||
|
||||
// 读取Body
|
||||
// ReadBody 读取Body
|
||||
ReadBody(buf []byte, callback ReaderFunc) error
|
||||
|
||||
// 读取某个范围内的Body
|
||||
// ReadBodyRange 读取某个范围内的Body
|
||||
ReadBodyRange(buf []byte, start int64, end int64, callback ReaderFunc) error
|
||||
|
||||
// Header Size
|
||||
// HeaderSize Header Size
|
||||
HeaderSize() int64
|
||||
|
||||
// Body Size
|
||||
// BodySize Body Size
|
||||
BodySize() int64
|
||||
|
||||
// 关闭
|
||||
// Close 关闭
|
||||
Close() error
|
||||
}
|
||||
|
||||
@@ -106,6 +106,10 @@ func (this *FileReader) Init() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *FileReader) TypeName() string {
|
||||
return "disk"
|
||||
}
|
||||
|
||||
func (this *FileReader) Status() int {
|
||||
return this.status
|
||||
}
|
||||
|
||||
@@ -16,6 +16,10 @@ func (this *MemoryReader) Init() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *MemoryReader) TypeName() string {
|
||||
return "memory"
|
||||
}
|
||||
|
||||
func (this *MemoryReader) Status() int {
|
||||
return this.item.Status
|
||||
}
|
||||
|
||||
@@ -12,6 +12,8 @@ import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
stringutil "github.com/iwind/TeaGo/utils/string"
|
||||
"golang.org/x/text/language"
|
||||
"golang.org/x/text/message"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@@ -40,7 +42,7 @@ var (
|
||||
ErrInvalidRange = errors.New("invalid range")
|
||||
)
|
||||
|
||||
// 文件缓存
|
||||
// FileStorage 文件缓存
|
||||
// 文件结构:
|
||||
// [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data]
|
||||
type FileStorage struct {
|
||||
@@ -49,7 +51,7 @@ type FileStorage struct {
|
||||
memoryStorage *MemoryStorage // 一级缓存
|
||||
totalSize int64
|
||||
|
||||
list *List
|
||||
list ListInterface
|
||||
locker sync.RWMutex
|
||||
ticker *utils.Ticker
|
||||
}
|
||||
@@ -57,52 +59,20 @@ type FileStorage struct {
|
||||
func NewFileStorage(policy *serverconfigs.HTTPCachePolicy) *FileStorage {
|
||||
return &FileStorage{
|
||||
policy: policy,
|
||||
list: NewList(),
|
||||
}
|
||||
}
|
||||
|
||||
// 获取当前的Policy
|
||||
// Policy 获取当前的Policy
|
||||
func (this *FileStorage) Policy() *serverconfigs.HTTPCachePolicy {
|
||||
return this.policy
|
||||
}
|
||||
|
||||
// 初始化
|
||||
// Init 初始化
|
||||
func (this *FileStorage) Init() error {
|
||||
this.list.OnAdd(func(item *Item) {
|
||||
atomic.AddInt64(&this.totalSize, item.TotalSize())
|
||||
})
|
||||
this.list.OnRemove(func(item *Item) {
|
||||
atomic.AddInt64(&this.totalSize, -item.TotalSize())
|
||||
})
|
||||
|
||||
this.locker.Lock()
|
||||
defer this.locker.Unlock()
|
||||
|
||||
before := time.Now()
|
||||
cacheDir := ""
|
||||
defer func() {
|
||||
// 统计
|
||||
count := 0
|
||||
size := int64(0)
|
||||
if this.list != nil {
|
||||
stat := this.list.Stat(func(hash string) bool {
|
||||
return true
|
||||
})
|
||||
count = stat.Count
|
||||
size = stat.Size
|
||||
}
|
||||
|
||||
cost := time.Since(before).Seconds() * 1000
|
||||
sizeMB := strconv.FormatInt(size, 10) + " Bytes"
|
||||
if size > 1024*1024*1024 {
|
||||
sizeMB = fmt.Sprintf("%.3f G", float64(size)/1024/1024/1024)
|
||||
} else if size > 1024*1024 {
|
||||
sizeMB = fmt.Sprintf("%.3f M", float64(size)/1024/1024)
|
||||
} else if size > 1024 {
|
||||
sizeMB = fmt.Sprintf("%.3f K", float64(size)/1024)
|
||||
}
|
||||
remotelogs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+" from '"+cacheDir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+strconv.Itoa(count)+", size: "+sizeMB)
|
||||
}()
|
||||
|
||||
// 配置
|
||||
cacheConfig := &serverconfigs.HTTPFileCacheStorage{}
|
||||
@@ -115,7 +85,7 @@ func (this *FileStorage) Init() error {
|
||||
return err
|
||||
}
|
||||
this.cacheConfig = cacheConfig
|
||||
cacheDir = cacheConfig.Dir
|
||||
cacheDir := cacheConfig.Dir
|
||||
|
||||
if !filepath.IsAbs(this.cacheConfig.Dir) {
|
||||
this.cacheConfig.Dir = Tea.Root + Tea.DS + this.cacheConfig.Dir
|
||||
@@ -127,6 +97,26 @@ func (this *FileStorage) Init() error {
|
||||
return errors.New("[CACHE]cache storage dir can not be empty")
|
||||
}
|
||||
|
||||
list := NewFileList(dir + "/p" + strconv.FormatInt(this.policy.Id, 10))
|
||||
err = list.Init()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
this.list = list
|
||||
stat, err := list.Stat(func(hash string) bool {
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
this.totalSize = stat.Size
|
||||
this.list.OnAdd(func(item *Item) {
|
||||
atomic.AddInt64(&this.totalSize, item.TotalSize())
|
||||
})
|
||||
this.list.OnRemove(func(item *Item) {
|
||||
atomic.AddInt64(&this.totalSize, -item.TotalSize())
|
||||
})
|
||||
|
||||
// 检查目录是否存在
|
||||
_, err = os.Stat(dir)
|
||||
if err != nil {
|
||||
@@ -140,6 +130,23 @@ func (this *FileStorage) Init() error {
|
||||
}
|
||||
}
|
||||
|
||||
defer func() {
|
||||
// 统计
|
||||
count := stat.Count
|
||||
size := stat.Size
|
||||
|
||||
cost := time.Since(before).Seconds() * 1000
|
||||
sizeMB := strconv.FormatInt(size, 10) + " Bytes"
|
||||
if size > 1024*1024*1024 {
|
||||
sizeMB = fmt.Sprintf("%.3f G", float64(size)/1024/1024/1024)
|
||||
} else if size > 1024*1024 {
|
||||
sizeMB = fmt.Sprintf("%.3f M", float64(size)/1024/1024)
|
||||
} else if size > 1024 {
|
||||
sizeMB = fmt.Sprintf("%.3f K", float64(size)/1024)
|
||||
}
|
||||
remotelogs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+" from '"+cacheDir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+message.NewPrinter(language.English).Sprintf("%d", count)+", size: "+sizeMB)
|
||||
}()
|
||||
|
||||
// 初始化list
|
||||
err = this.initList()
|
||||
if err != nil {
|
||||
@@ -148,14 +155,13 @@ func (this *FileStorage) Init() error {
|
||||
|
||||
// 加载内存缓存
|
||||
if this.cacheConfig.MemoryPolicy != nil {
|
||||
memoryCapacity := this.cacheConfig.MemoryPolicy.Capacity
|
||||
if memoryCapacity != nil && memoryCapacity.Count > 0 {
|
||||
if this.cacheConfig.MemoryPolicy.Capacity != nil && this.cacheConfig.MemoryPolicy.Capacity.Count > 0 {
|
||||
memoryPolicy := &serverconfigs.HTTPCachePolicy{
|
||||
Id: this.policy.Id,
|
||||
IsOn: this.policy.IsOn,
|
||||
Name: this.policy.Name,
|
||||
Description: this.policy.Description,
|
||||
Capacity: memoryCapacity,
|
||||
Capacity: this.cacheConfig.MemoryPolicy.Capacity,
|
||||
MaxKeys: this.policy.MaxKeys,
|
||||
MaxSize: &shared.SizeCapacity{Count: 128, Unit: shared.SizeCapacityUnitMB}, // TODO 将来可以修改
|
||||
Type: serverconfigs.CachePolicyStorageMemory,
|
||||
@@ -189,10 +195,7 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) {
|
||||
}
|
||||
}
|
||||
|
||||
hash, path := this.keyPath(key)
|
||||
if !this.list.Exist(hash) {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
_, path := this.keyPath(key)
|
||||
|
||||
// TODO 尝试使用mmap加快读取速度
|
||||
fp, err := os.OpenFile(path, os.O_RDONLY, 0444)
|
||||
@@ -214,7 +217,7 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) {
|
||||
return reader, nil
|
||||
}
|
||||
|
||||
// 打开缓存文件等待写入
|
||||
// OpenWriter 打开缓存文件等待写入
|
||||
func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) {
|
||||
// 先尝试内存缓存
|
||||
if this.memoryStorage != nil {
|
||||
@@ -225,16 +228,21 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr
|
||||
}
|
||||
|
||||
// 检查是否超出最大值
|
||||
if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys {
|
||||
count, err := this.list.Count()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if this.policy.MaxKeys > 0 && count > this.policy.MaxKeys {
|
||||
return nil, errors.New("write file cache failed: too many keys in cache storage")
|
||||
}
|
||||
if this.policy.CapacityBytes() > 0 && this.policy.CapacityBytes() <= this.totalSize {
|
||||
return nil, errors.New("write file cache failed: over disk size, real size: " + strconv.FormatInt(this.totalSize, 10) + " bytes")
|
||||
capacityBytes := this.diskCapacityBytes()
|
||||
if capacityBytes > 0 && capacityBytes <= this.totalSize {
|
||||
return nil, errors.New("write file cache failed: over disk size, current total size: " + strconv.FormatInt(this.totalSize, 10) + " bytes, capacity: " + strconv.FormatInt(capacityBytes, 10))
|
||||
}
|
||||
|
||||
hash := stringutil.Md5(key)
|
||||
dir := this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4]
|
||||
_, err := os.Stat(dir)
|
||||
_, err = os.Stat(dir)
|
||||
if err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
return nil, err
|
||||
@@ -246,7 +254,10 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr
|
||||
}
|
||||
|
||||
// 先删除
|
||||
this.list.Remove(hash)
|
||||
err = this.list.Remove(hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
path := dir + "/" + hash + ".cache.tmp"
|
||||
writer, err := os.OpenFile(path, os.O_CREATE|os.O_SYNC|os.O_WRONLY, 0666)
|
||||
@@ -340,7 +351,7 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr
|
||||
return NewFileWriter(writer, key, expiredAt), nil
|
||||
}
|
||||
|
||||
// 添加到List
|
||||
// AddToList 添加到List
|
||||
func (this *FileStorage) AddToList(item *Item) {
|
||||
if this.memoryStorage != nil {
|
||||
if item.Type == ItemTypeMemory {
|
||||
@@ -351,10 +362,13 @@ func (this *FileStorage) AddToList(item *Item) {
|
||||
|
||||
item.MetaSize = SizeMeta
|
||||
hash := stringutil.Md5(item.Key)
|
||||
this.list.Add(hash, item)
|
||||
err := this.list.Add(hash, item)
|
||||
if err != nil && !strings.Contains(err.Error(), "UNIQUE constraint failed") {
|
||||
remotelogs.Error("CACHE", "add to list failed: "+err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// 删除某个键值对应的缓存
|
||||
// Delete 删除某个键值对应的缓存
|
||||
func (this *FileStorage) Delete(key string) error {
|
||||
this.locker.Lock()
|
||||
defer this.locker.Unlock()
|
||||
@@ -365,25 +379,28 @@ func (this *FileStorage) Delete(key string) error {
|
||||
}
|
||||
|
||||
hash, path := this.keyPath(key)
|
||||
this.list.Remove(hash)
|
||||
err := os.Remove(path)
|
||||
err := this.list.Remove(hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = os.Remove(path)
|
||||
if err == nil || os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// 统计
|
||||
// Stat 统计
|
||||
func (this *FileStorage) Stat() (*Stat, error) {
|
||||
this.locker.RLock()
|
||||
defer this.locker.RUnlock()
|
||||
|
||||
return this.list.Stat(func(hash string) bool {
|
||||
return true
|
||||
}), nil
|
||||
})
|
||||
}
|
||||
|
||||
// 清除所有的缓存
|
||||
// CleanAll 清除所有的缓存
|
||||
func (this *FileStorage) CleanAll() error {
|
||||
this.locker.Lock()
|
||||
defer this.locker.Unlock()
|
||||
@@ -393,7 +410,10 @@ func (this *FileStorage) CleanAll() error {
|
||||
_ = this.memoryStorage.CleanAll()
|
||||
}
|
||||
|
||||
this.list.Reset()
|
||||
err := this.list.CleanAll()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 删除缓存和目录
|
||||
// 不能直接删除子目录,比较危险
|
||||
@@ -441,7 +461,7 @@ func (this *FileStorage) CleanAll() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 清理过期的缓存
|
||||
// Purge 清理过期的缓存
|
||||
func (this *FileStorage) Purge(keys []string, urlType string) error {
|
||||
this.locker.Lock()
|
||||
defer this.locker.Unlock()
|
||||
@@ -455,7 +475,11 @@ func (this *FileStorage) Purge(keys []string, urlType string) error {
|
||||
if urlType == "dir" {
|
||||
resultKeys := []string{}
|
||||
for _, key := range keys {
|
||||
resultKeys = append(resultKeys, this.list.FindKeysWithPrefix(key)...)
|
||||
subKeys, err := this.list.FindKeysWithPrefix(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resultKeys = append(resultKeys, subKeys...)
|
||||
}
|
||||
keys = resultKeys
|
||||
}
|
||||
@@ -463,7 +487,11 @@ func (this *FileStorage) Purge(keys []string, urlType string) error {
|
||||
// 文件
|
||||
for _, key := range keys {
|
||||
hash, path := this.keyPath(key)
|
||||
if !this.list.Exist(hash) {
|
||||
exists, err := this.list.Exist(hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !exists {
|
||||
err := os.Remove(path)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
@@ -471,16 +499,19 @@ func (this *FileStorage) Purge(keys []string, urlType string) error {
|
||||
continue
|
||||
}
|
||||
|
||||
err := os.Remove(path)
|
||||
err = os.Remove(path)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
this.list.Remove(hash)
|
||||
err = this.list.Remove(hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 停止
|
||||
// Stop 停止
|
||||
func (this *FileStorage) Stop() {
|
||||
this.locker.Lock()
|
||||
defer this.locker.Unlock()
|
||||
@@ -490,12 +521,25 @@ func (this *FileStorage) Stop() {
|
||||
this.memoryStorage.Stop()
|
||||
}
|
||||
|
||||
this.list.Reset()
|
||||
_ = this.list.Reset()
|
||||
if this.ticker != nil {
|
||||
this.ticker.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
// TotalDiskSize 消耗的磁盘尺寸
|
||||
func (this *FileStorage) TotalDiskSize() int64 {
|
||||
return atomic.LoadInt64(&this.totalSize)
|
||||
}
|
||||
|
||||
// TotalMemorySize 内存尺寸
|
||||
func (this *FileStorage) TotalMemorySize() int64 {
|
||||
if this.memoryStorage == nil {
|
||||
return 0
|
||||
}
|
||||
return this.memoryStorage.TotalMemorySize()
|
||||
}
|
||||
|
||||
// 绝对路径
|
||||
func (this *FileStorage) dir() string {
|
||||
return this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/"
|
||||
@@ -521,45 +565,23 @@ func (this *FileStorage) hashPath(hash string) (path string) {
|
||||
|
||||
// 初始化List
|
||||
func (this *FileStorage) initList() error {
|
||||
this.list.Reset()
|
||||
|
||||
dir := this.dir()
|
||||
|
||||
// 清除tmp
|
||||
files, err := filepath.Glob(dir + "/*/*/*.cache.tmp")
|
||||
err := this.list.Reset()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, path := range files {
|
||||
_ = os.Remove(path)
|
||||
}
|
||||
|
||||
// 加载缓存
|
||||
files, err = filepath.Glob(dir + "/*/*/*.cache")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, path := range files {
|
||||
basename := filepath.Base(path)
|
||||
index := strings.LastIndex(basename, ".")
|
||||
if index < 0 {
|
||||
continue
|
||||
}
|
||||
hash := basename[:index]
|
||||
// 使用异步防止阻塞主线程
|
||||
go func() {
|
||||
dir := this.dir()
|
||||
|
||||
// 解析文件信息
|
||||
item, err := this.decodeFile(path)
|
||||
if err != nil {
|
||||
if err != ErrNotFound {
|
||||
remotelogs.Error("CACHE", "decode path '"+path+"': "+err.Error())
|
||||
// 清除tmp
|
||||
files, err := filepath.Glob(dir + "/*/*/*.cache.tmp")
|
||||
if err == nil && len(files) > 0 {
|
||||
for _, path := range files {
|
||||
_ = os.Remove(path)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if item == nil {
|
||||
continue
|
||||
}
|
||||
this.list.Add(hash, item)
|
||||
}
|
||||
}()
|
||||
|
||||
// 启动定时清理任务
|
||||
this.ticker = utils.NewTicker(30 * time.Second)
|
||||
@@ -673,12 +695,13 @@ func (this *FileStorage) decodeFile(path string) (*Item, error) {
|
||||
|
||||
// 清理任务
|
||||
func (this *FileStorage) purgeLoop() {
|
||||
this.list.Purge(1000, func(hash string) {
|
||||
_ = this.list.Purge(1000, func(hash string) error {
|
||||
path := this.hashPath(hash)
|
||||
err := os.Remove(path)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error())
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
@@ -709,3 +732,14 @@ func (this *FileStorage) readN(fp *os.File, buf []byte, total int) (result []byt
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (this *FileStorage) diskCapacityBytes() int64 {
|
||||
c1 := this.policy.CapacityBytes()
|
||||
if SharedManager.MaxDiskCapacity != nil {
|
||||
c2 := SharedManager.MaxDiskCapacity.Bytes()
|
||||
if c2 > 0 {
|
||||
return c2
|
||||
}
|
||||
}
|
||||
return c1
|
||||
}
|
||||
|
||||
@@ -40,7 +40,7 @@ func TestFileStorage_Init(t *testing.T) {
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
storage.purgeLoop()
|
||||
t.Log(len(storage.list.m), "entries left")
|
||||
t.Log(storage.list.(*FileList).total, "entries left")
|
||||
}
|
||||
|
||||
func TestFileStorage_OpenWriter(t *testing.T) {
|
||||
@@ -441,14 +441,16 @@ func TestFileStorage_CleanAll(t *testing.T) {
|
||||
t.Log(time.Since(before).Seconds()*1000, "ms")
|
||||
}()
|
||||
|
||||
t.Log("before:", storage.list.m)
|
||||
c, _ := storage.list.Count()
|
||||
t.Log("before:", c)
|
||||
|
||||
err = storage.CleanAll()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Log("after:", storage.list.m)
|
||||
c, _ = storage.list.Count()
|
||||
t.Log("after:", c)
|
||||
t.Log("ok")
|
||||
}
|
||||
|
||||
|
||||
@@ -4,35 +4,41 @@ import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
)
|
||||
|
||||
// 缓存存储接口
|
||||
// StorageInterface 缓存存储接口
|
||||
type StorageInterface interface {
|
||||
// 初始化
|
||||
// Init 初始化
|
||||
Init() error
|
||||
|
||||
// 读取缓存
|
||||
// OpenReader 读取缓存
|
||||
OpenReader(key string) (Reader, error)
|
||||
|
||||
// 打开缓存写入器等待写入
|
||||
// OpenWriter 打开缓存写入器等待写入
|
||||
OpenWriter(key string, expiredAt int64, status int) (Writer, error)
|
||||
|
||||
// 删除某个键值对应的缓存
|
||||
// Delete 删除某个键值对应的缓存
|
||||
Delete(key string) error
|
||||
|
||||
// 统计缓存
|
||||
// Stat 统计缓存
|
||||
Stat() (*Stat, error)
|
||||
|
||||
// 清除所有缓存
|
||||
// TotalDiskSize 消耗的磁盘尺寸
|
||||
TotalDiskSize() int64
|
||||
|
||||
// TotalMemorySize 内存尺寸
|
||||
TotalMemorySize() int64
|
||||
|
||||
// CleanAll 清除所有缓存
|
||||
CleanAll() error
|
||||
|
||||
// 批量删除缓存
|
||||
// Purge 批量删除缓存
|
||||
Purge(keys []string, urlType string) error
|
||||
|
||||
// 停止缓存策略
|
||||
// Stop 停止缓存策略
|
||||
Stop()
|
||||
|
||||
// 获取当前存储的Policy
|
||||
// Policy 获取当前存储的Policy
|
||||
Policy() *serverconfigs.HTTPCachePolicy
|
||||
|
||||
// 将缓存添加到列表
|
||||
// AddToList 将缓存添加到列表
|
||||
AddToList(item *Item)
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ type MemoryItem struct {
|
||||
|
||||
type MemoryStorage struct {
|
||||
policy *serverconfigs.HTTPCachePolicy
|
||||
list *List
|
||||
list ListInterface
|
||||
locker *sync.RWMutex
|
||||
valuesMap map[uint64]*MemoryItem
|
||||
ticker *utils.Ticker
|
||||
@@ -33,19 +33,19 @@ type MemoryStorage struct {
|
||||
func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy) *MemoryStorage {
|
||||
return &MemoryStorage{
|
||||
policy: policy,
|
||||
list: NewList(),
|
||||
list: NewMemoryList(),
|
||||
locker: &sync.RWMutex{},
|
||||
valuesMap: map[uint64]*MemoryItem{},
|
||||
}
|
||||
}
|
||||
|
||||
// 初始化
|
||||
// Init 初始化
|
||||
func (this *MemoryStorage) Init() error {
|
||||
this.list.OnAdd(func(item *Item) {
|
||||
atomic.AddInt64(&this.totalSize, item.Size())
|
||||
atomic.AddInt64(&this.totalSize, item.TotalSize())
|
||||
})
|
||||
this.list.OnRemove(func(item *Item) {
|
||||
atomic.AddInt64(&this.totalSize, -item.Size())
|
||||
atomic.AddInt64(&this.totalSize, -item.TotalSize())
|
||||
})
|
||||
|
||||
if this.purgeDuration <= 0 {
|
||||
@@ -63,7 +63,7 @@ func (this *MemoryStorage) Init() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 读取缓存
|
||||
// OpenReader 读取缓存
|
||||
func (this *MemoryStorage) OpenReader(key string) (Reader, error) {
|
||||
hash := this.hash(key)
|
||||
|
||||
@@ -89,18 +89,23 @@ func (this *MemoryStorage) OpenReader(key string) (Reader, error) {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
// 打开缓存写入器等待写入
|
||||
// OpenWriter 打开缓存写入器等待写入
|
||||
func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) {
|
||||
// 检查是否超出最大值
|
||||
if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys {
|
||||
totalKeys, err := this.list.Count()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if this.policy.MaxKeys > 0 && totalKeys > this.policy.MaxKeys {
|
||||
return nil, errors.New("write memory cache failed: too many keys in cache storage")
|
||||
}
|
||||
if this.policy.CapacityBytes() > 0 && this.policy.CapacityBytes() <= this.totalSize {
|
||||
capacityBytes := this.memoryCapacityBytes()
|
||||
if capacityBytes > 0 && capacityBytes <= this.totalSize {
|
||||
return nil, errors.New("write memory cache failed: over memory size, real size: " + strconv.FormatInt(this.totalSize, 10) + " bytes")
|
||||
}
|
||||
|
||||
// 先删除
|
||||
err := this.Delete(key)
|
||||
err = this.Delete(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -108,43 +113,47 @@ func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (
|
||||
return NewMemoryWriter(this.valuesMap, key, expiredAt, status, this.locker), nil
|
||||
}
|
||||
|
||||
// 删除某个键值对应的缓存
|
||||
// Delete 删除某个键值对应的缓存
|
||||
func (this *MemoryStorage) Delete(key string) error {
|
||||
hash := this.hash(key)
|
||||
this.locker.Lock()
|
||||
delete(this.valuesMap, hash)
|
||||
this.list.Remove(fmt.Sprintf("%d", hash))
|
||||
_ = this.list.Remove(fmt.Sprintf("%d", hash))
|
||||
this.locker.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// 统计缓存
|
||||
// Stat 统计缓存
|
||||
func (this *MemoryStorage) Stat() (*Stat, error) {
|
||||
this.locker.RLock()
|
||||
defer this.locker.RUnlock()
|
||||
|
||||
return this.list.Stat(func(hash string) bool {
|
||||
return true
|
||||
}), nil
|
||||
})
|
||||
}
|
||||
|
||||
// 清除所有缓存
|
||||
// CleanAll 清除所有缓存
|
||||
func (this *MemoryStorage) CleanAll() error {
|
||||
this.locker.Lock()
|
||||
this.valuesMap = map[uint64]*MemoryItem{}
|
||||
this.list.Reset()
|
||||
_ = this.list.Reset()
|
||||
atomic.StoreInt64(&this.totalSize, 0)
|
||||
this.locker.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// 批量删除缓存
|
||||
// Purge 批量删除缓存
|
||||
func (this *MemoryStorage) Purge(keys []string, urlType string) error {
|
||||
// 目录
|
||||
if urlType == "dir" {
|
||||
resultKeys := []string{}
|
||||
for _, key := range keys {
|
||||
resultKeys = append(resultKeys, this.list.FindKeysWithPrefix(key)...)
|
||||
subKeys, err := this.list.FindKeysWithPrefix(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resultKeys = append(resultKeys, subKeys...)
|
||||
}
|
||||
keys = resultKeys
|
||||
}
|
||||
@@ -158,28 +167,38 @@ func (this *MemoryStorage) Purge(keys []string, urlType string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 停止缓存策略
|
||||
// Stop 停止缓存策略
|
||||
func (this *MemoryStorage) Stop() {
|
||||
this.locker.Lock()
|
||||
defer this.locker.Unlock()
|
||||
|
||||
this.valuesMap = map[uint64]*MemoryItem{}
|
||||
this.list.Reset()
|
||||
_ = this.list.Reset()
|
||||
if this.ticker != nil {
|
||||
this.ticker.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
// 获取当前存储的Policy
|
||||
// Policy 获取当前存储的Policy
|
||||
func (this *MemoryStorage) Policy() *serverconfigs.HTTPCachePolicy {
|
||||
return this.policy
|
||||
}
|
||||
|
||||
// 将缓存添加到列表
|
||||
// AddToList 将缓存添加到列表
|
||||
func (this *MemoryStorage) AddToList(item *Item) {
|
||||
item.MetaSize = int64(len(item.Key)) + 32 /** 32是我们评估的数据结构的长度 **/
|
||||
hash := fmt.Sprintf("%d", this.hash(item.Key))
|
||||
this.list.Add(hash, item)
|
||||
_ = this.list.Add(hash, item)
|
||||
}
|
||||
|
||||
// TotalDiskSize 消耗的磁盘尺寸
|
||||
func (this *MemoryStorage) TotalDiskSize() int64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
// TotalMemorySize 内存尺寸
|
||||
func (this *MemoryStorage) TotalMemorySize() int64 {
|
||||
return atomic.LoadInt64(&this.totalSize)
|
||||
}
|
||||
|
||||
// 计算Key Hash
|
||||
@@ -189,12 +208,30 @@ func (this *MemoryStorage) hash(key string) uint64 {
|
||||
|
||||
// 清理任务
|
||||
func (this *MemoryStorage) purgeLoop() {
|
||||
this.list.Purge(2048, func(hash string) {
|
||||
_ = this.list.Purge(2048, func(hash string) error {
|
||||
uintHash, err := strconv.ParseUint(hash, 10, 64)
|
||||
if err == nil {
|
||||
this.locker.Lock()
|
||||
delete(this.valuesMap, uintHash)
|
||||
this.locker.Unlock()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (this *MemoryStorage) memoryCapacityBytes() int64 {
|
||||
if this.policy == nil {
|
||||
return 0
|
||||
}
|
||||
c1 := int64(0)
|
||||
if this.policy.Capacity != nil {
|
||||
c1 = this.policy.Capacity.Bytes()
|
||||
}
|
||||
if SharedManager.MaxMemoryCapacity != nil {
|
||||
c2 := SharedManager.MaxMemoryCapacity.Bytes()
|
||||
if c2 > 0 {
|
||||
return c2
|
||||
}
|
||||
}
|
||||
return c1
|
||||
}
|
||||
|
||||
@@ -174,7 +174,8 @@ func TestMemoryStorage_CleanAll(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log(storage.list.Count(), len(storage.valuesMap))
|
||||
total, _ := storage.list.Count()
|
||||
t.Log(total, len(storage.valuesMap))
|
||||
}
|
||||
|
||||
func TestMemoryStorage_Purge(t *testing.T) {
|
||||
@@ -208,7 +209,8 @@ func TestMemoryStorage_Purge(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log(storage.list.Count(), len(storage.valuesMap))
|
||||
total, _ := storage.list.Count()
|
||||
t.Log(total, len(storage.valuesMap))
|
||||
}
|
||||
|
||||
func TestMemoryStorage_Expire(t *testing.T) {
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package caches
|
||||
|
||||
import "compress/gzip"
|
||||
import (
|
||||
"compress/gzip"
|
||||
)
|
||||
|
||||
type gzipWriter struct {
|
||||
rawWriter Writer
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package teaconst
|
||||
|
||||
const (
|
||||
Version = "0.0.13"
|
||||
Version = "0.1.1"
|
||||
|
||||
ProductName = "Edge Node"
|
||||
ProcessName = "edge-node"
|
||||
@@ -11,6 +11,6 @@ const (
|
||||
EncryptKey = "8f983f4d69b83aaa0d74b21a212f6967"
|
||||
EncryptMethod = "aes-256-cfb"
|
||||
|
||||
// systemd
|
||||
// SystemdServiceName systemd
|
||||
SystemdServiceName = "edge-node"
|
||||
)
|
||||
|
||||
@@ -47,7 +47,7 @@ func (this *IPListManager) Start() {
|
||||
// 第一次读取
|
||||
err := this.loop()
|
||||
if err != nil {
|
||||
remotelogs.Println("IP_LIST_MANAGER", err.Error())
|
||||
remotelogs.Error("IP_LIST_MANAGER", err.Error())
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(60 * time.Second)
|
||||
@@ -64,7 +64,7 @@ func (this *IPListManager) Start() {
|
||||
if err != nil {
|
||||
countErrors++
|
||||
|
||||
remotelogs.Println("IP_LIST_MANAGER", err.Error())
|
||||
remotelogs.Error("IP_LIST_MANAGER", err.Error())
|
||||
|
||||
// 连续错误小于3次的我们立即重试
|
||||
if countErrors <= 3 {
|
||||
|
||||
10
internal/monitor/value.go
Normal file
10
internal/monitor/value.go
Normal file
@@ -0,0 +1,10 @@
|
||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package monitor
|
||||
|
||||
// ItemValue 数据值定义
|
||||
type ItemValue struct {
|
||||
Item string
|
||||
ValueJSON []byte
|
||||
CreatedAt int64
|
||||
}
|
||||
79
internal/monitor/value_queue.go
Normal file
79
internal/monitor/value_queue.go
Normal file
@@ -0,0 +1,79 @@
|
||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package monitor
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/rpc"
|
||||
"github.com/iwind/TeaGo/maps"
|
||||
"time"
|
||||
)
|
||||
|
||||
var SharedValueQueue = NewValueQueue()
|
||||
|
||||
func init() {
|
||||
events.On(events.EventStart, func() {
|
||||
go SharedValueQueue.Start()
|
||||
})
|
||||
}
|
||||
|
||||
// ValueQueue 数据记录队列
|
||||
type ValueQueue struct {
|
||||
valuesChan chan *ItemValue
|
||||
}
|
||||
|
||||
func NewValueQueue() *ValueQueue {
|
||||
return &ValueQueue{
|
||||
valuesChan: make(chan *ItemValue, 1024),
|
||||
}
|
||||
}
|
||||
|
||||
// Start 启动队列
|
||||
func (this *ValueQueue) Start() {
|
||||
// 这里单次循环就行,因为Loop里已经使用了Range通道
|
||||
err := this.Loop()
|
||||
if err != nil {
|
||||
remotelogs.Error("MONITOR_QUEUE", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// Add 添加数据
|
||||
func (this *ValueQueue) Add(item string, value maps.Map) {
|
||||
valueJSON, err := json.Marshal(value)
|
||||
if err != nil {
|
||||
remotelogs.Error("MONITOR_QUEUE", "marshal value error: "+err.Error())
|
||||
return
|
||||
}
|
||||
select {
|
||||
case this.valuesChan <- &ItemValue{
|
||||
Item: item,
|
||||
ValueJSON: valueJSON,
|
||||
CreatedAt: time.Now().Unix(),
|
||||
}:
|
||||
default:
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// Loop 单次循环
|
||||
func (this *ValueQueue) Loop() error {
|
||||
rpcClient, err := rpc.SharedRPC()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for value := range this.valuesChan {
|
||||
_, err = rpcClient.NodeValueRPC().CreateNodeValue(rpcClient.Context(), &pb.CreateNodeValueRequest{
|
||||
Item: value.Item,
|
||||
ValueJSON: value.ValueJSON,
|
||||
CreatedAt: value.CreatedAt,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -5,13 +5,13 @@ import (
|
||||
"net/http"
|
||||
)
|
||||
|
||||
// HTTP客户端
|
||||
// HTTPClient HTTP客户端
|
||||
type HTTPClient struct {
|
||||
rawClient *http.Client
|
||||
accessAt int64
|
||||
}
|
||||
|
||||
// 获取新客户端对象
|
||||
// NewHTTPClient 获取新客户端对象
|
||||
func NewHTTPClient(rawClient *http.Client) *HTTPClient {
|
||||
return &HTTPClient{
|
||||
rawClient: rawClient,
|
||||
@@ -19,22 +19,22 @@ func NewHTTPClient(rawClient *http.Client) *HTTPClient {
|
||||
}
|
||||
}
|
||||
|
||||
// 获取原始客户端对象
|
||||
// RawClient 获取原始客户端对象
|
||||
func (this *HTTPClient) RawClient() *http.Client {
|
||||
return this.rawClient
|
||||
}
|
||||
|
||||
// 更新访问时间
|
||||
// UpdateAccessTime 更新访问时间
|
||||
func (this *HTTPClient) UpdateAccessTime() {
|
||||
this.accessAt = utils.UnixTime()
|
||||
}
|
||||
|
||||
// 获取访问时间
|
||||
// AccessTime 获取访问时间
|
||||
func (this *HTTPClient) AccessTime() int64 {
|
||||
return this.accessAt
|
||||
}
|
||||
|
||||
// 关闭
|
||||
// Close 关闭
|
||||
func (this *HTTPClient) Close() {
|
||||
this.rawClient.CloseIdleConnections()
|
||||
}
|
||||
|
||||
@@ -14,17 +14,17 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// HTTP客户端池单例
|
||||
// SharedHTTPClientPool HTTP客户端池单例
|
||||
var SharedHTTPClientPool = NewHTTPClientPool()
|
||||
|
||||
// 客户端池
|
||||
// HTTPClientPool 客户端池
|
||||
type HTTPClientPool struct {
|
||||
clientExpiredDuration time.Duration
|
||||
clientsMap map[string]*HTTPClient // backend key => client
|
||||
locker sync.Mutex
|
||||
}
|
||||
|
||||
// 获取新对象
|
||||
// NewHTTPClientPool 获取新对象
|
||||
func NewHTTPClientPool() *HTTPClientPool {
|
||||
pool := &HTTPClientPool{
|
||||
clientExpiredDuration: 3600 * time.Second,
|
||||
@@ -36,7 +36,7 @@ func NewHTTPClientPool() *HTTPClientPool {
|
||||
return pool
|
||||
}
|
||||
|
||||
// 根据地址获取客户端
|
||||
// Client 根据地址获取客户端
|
||||
func (this *HTTPClientPool) Client(req *http.Request, origin *serverconfigs.OriginConfig, originAddr string) (rawClient *http.Client, err error) {
|
||||
if origin.Addr == nil {
|
||||
return nil, errors.New("origin addr should not be empty (originId:" + strconv.FormatInt(origin.Id, 10) + ")")
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
|
||||
@@ -9,6 +10,7 @@ import (
|
||||
"github.com/TeaOSLab/EdgeNode/internal/stats"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
"golang.org/x/net/http2"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
@@ -28,7 +30,7 @@ var bytePool1k = utils.NewBytePool(20480, 1024)
|
||||
var bytePool32k = utils.NewBytePool(20480, 32*1024)
|
||||
var bytePool128k = utils.NewBytePool(20480, 128*1024)
|
||||
|
||||
// HTTP请求
|
||||
// HTTPRequest HTTP请求
|
||||
type HTTPRequest struct {
|
||||
// 外部参数
|
||||
RawReq *http.Request
|
||||
@@ -95,7 +97,7 @@ func (this *HTTPRequest) init() {
|
||||
this.requestFromTime = time.Now()
|
||||
}
|
||||
|
||||
// 执行请求
|
||||
// Do 执行请求
|
||||
func (this *HTTPRequest) Do() {
|
||||
// 初始化
|
||||
this.init()
|
||||
@@ -191,6 +193,13 @@ func (this *HTTPRequest) doBegin() {
|
||||
}
|
||||
}
|
||||
|
||||
// Fastcgi
|
||||
if this.web.FastcgiRef != nil && this.web.FastcgiRef.IsOn && len(this.web.FastcgiList) > 0 {
|
||||
if this.doFastcgi() {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// root
|
||||
if this.web.Root != nil && this.web.Root.IsOn {
|
||||
// 如果处理成功,则终止请求的处理
|
||||
@@ -210,9 +219,6 @@ func (this *HTTPRequest) doBegin() {
|
||||
return
|
||||
}
|
||||
|
||||
// Fastcgi
|
||||
// TODO
|
||||
|
||||
// 返回404页面
|
||||
this.write404()
|
||||
}
|
||||
@@ -229,7 +235,7 @@ func (this *HTTPRequest) doEnd() {
|
||||
}
|
||||
}
|
||||
|
||||
// 原始的请求URI
|
||||
// RawURI 原始的请求URI
|
||||
func (this *HTTPRequest) RawURI() string {
|
||||
return this.rawURI
|
||||
}
|
||||
@@ -332,6 +338,12 @@ func (this *HTTPRequest) configureWeb(web *serverconfigs.HTTPWebConfig, isTop bo
|
||||
this.web.StatRef = web.StatRef
|
||||
}
|
||||
|
||||
// fastcgi
|
||||
if web.FastcgiRef != nil && (web.FastcgiRef.IsPrior || isTop) {
|
||||
this.web.FastcgiRef = web.FastcgiRef
|
||||
this.web.FastcgiList = web.FastcgiList
|
||||
}
|
||||
|
||||
// 重写规则
|
||||
if len(web.RewriteRefs) > 0 {
|
||||
for index, ref := range web.RewriteRefs {
|
||||
@@ -433,7 +445,7 @@ func (this *HTTPRequest) configureWeb(web *serverconfigs.HTTPWebConfig, isTop bo
|
||||
return nil
|
||||
}
|
||||
|
||||
// 利用请求参数格式化字符串
|
||||
// Format 利用请求参数格式化字符串
|
||||
func (this *HTTPRequest) Format(source string) string {
|
||||
if len(source) == 0 {
|
||||
return ""
|
||||
@@ -1123,3 +1135,24 @@ func (this *HTTPRequest) bytePool(contentLength int64) *utils.BytePool {
|
||||
}
|
||||
return bytePool128k
|
||||
}
|
||||
|
||||
// 检查是否可以忽略错误
|
||||
func (this *HTTPRequest) canIgnore(err error) bool {
|
||||
if err == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
// 客户端主动取消
|
||||
if err == context.Canceled {
|
||||
return true
|
||||
}
|
||||
|
||||
// HTTP/2流错误
|
||||
{
|
||||
_, ok := err.(http2.StreamError)
|
||||
if ok {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -12,16 +12,26 @@ import (
|
||||
|
||||
// 读取缓存
|
||||
func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
|
||||
if this.web.Cache == nil || !this.web.Cache.IsOn || len(this.web.Cache.CacheRefs) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
cachePolicy := sharedNodeConfig.HTTPCachePolicy
|
||||
if cachePolicy == nil || !cachePolicy.IsOn {
|
||||
return
|
||||
}
|
||||
|
||||
// 检查条件
|
||||
if this.web.Cache == nil || !this.web.Cache.IsOn || (len(cachePolicy.CacheRefs) == 0 && len(this.web.Cache.CacheRefs) == 0) {
|
||||
return
|
||||
}
|
||||
var addStatusHeader = this.web.Cache.AddStatusHeader
|
||||
if addStatusHeader {
|
||||
defer func() {
|
||||
cacheStatus := this.varMapping["cache.status"]
|
||||
if cacheStatus != "HIT" {
|
||||
this.writer.Header().Set("X-Cache", cacheStatus)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// 检查服务独立的缓存条件
|
||||
refType := ""
|
||||
for _, cacheRef := range this.web.Cache.CacheRefs {
|
||||
if !cacheRef.IsOn ||
|
||||
cacheRef.Conds == nil ||
|
||||
@@ -30,11 +40,28 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
|
||||
}
|
||||
if cacheRef.Conds.MatchRequest(this.Format) {
|
||||
this.cacheRef = cacheRef
|
||||
refType = "server"
|
||||
break
|
||||
}
|
||||
}
|
||||
if this.cacheRef == nil {
|
||||
return
|
||||
// 检查策略默认的缓存条件
|
||||
for _, cacheRef := range cachePolicy.CacheRefs {
|
||||
if !cacheRef.IsOn ||
|
||||
cacheRef.Conds == nil ||
|
||||
!cacheRef.Conds.HasRequestConds() {
|
||||
continue
|
||||
}
|
||||
if cacheRef.Conds.MatchRequest(this.Format) {
|
||||
this.cacheRef = cacheRef
|
||||
refType = "policy"
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if this.cacheRef == nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// 相关变量
|
||||
@@ -80,7 +107,9 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
|
||||
return
|
||||
}
|
||||
|
||||
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
|
||||
if !this.canIgnore(err) {
|
||||
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
@@ -88,6 +117,7 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
|
||||
}()
|
||||
|
||||
this.varMapping["cache.status"] = "HIT"
|
||||
this.logAttrs["cache.status"] = "HIT"
|
||||
|
||||
// 读取Header
|
||||
headerBuf := []byte{}
|
||||
@@ -111,10 +141,15 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
|
||||
if !this.canIgnore(err) {
|
||||
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if addStatusHeader {
|
||||
this.writer.Header().Set("X-Cache", "HIT, "+refType+", "+reader.TypeName())
|
||||
}
|
||||
this.processResponseHeaders(reader.Status())
|
||||
|
||||
// 输出Body
|
||||
@@ -198,7 +233,9 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
|
||||
this.writer.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
|
||||
return true
|
||||
}
|
||||
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
|
||||
if !this.canIgnore(err) {
|
||||
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
} else if len(rangeSet) > 1 {
|
||||
@@ -239,7 +276,9 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
|
||||
return true, err
|
||||
})
|
||||
if err != nil {
|
||||
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
|
||||
if !this.canIgnore(err) {
|
||||
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
|
||||
}
|
||||
return true
|
||||
}
|
||||
}
|
||||
@@ -260,7 +299,9 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
|
||||
if !this.canIgnore(err) {
|
||||
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
213
internal/nodes/http_request_fastcgi.go
Normal file
213
internal/nodes/http_request_fastcgi.go
Normal file
@@ -0,0 +1,213 @@
|
||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
"github.com/iwind/TeaGo/Tea"
|
||||
"github.com/iwind/TeaGo/maps"
|
||||
"github.com/iwind/TeaGo/rands"
|
||||
"github.com/iwind/TeaGo/types"
|
||||
"github.com/iwind/gofcgi/pkg"
|
||||
"io"
|
||||
"net"
|
||||
"net/url"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func (this *HTTPRequest) doFastcgi() (shouldStop bool) {
|
||||
fastcgiList := []*serverconfigs.HTTPFastcgiConfig{}
|
||||
for _, fastcgi := range this.web.FastcgiList {
|
||||
if !fastcgi.IsOn {
|
||||
continue
|
||||
}
|
||||
fastcgiList = append(fastcgiList, fastcgi)
|
||||
}
|
||||
if len(fastcgiList) == 0 {
|
||||
return false
|
||||
}
|
||||
shouldStop = true
|
||||
fastcgi := fastcgiList[rands.Int(0, len(fastcgiList)-1)]
|
||||
|
||||
env := fastcgi.FilterParams()
|
||||
if !env.Has("DOCUMENT_ROOT") {
|
||||
env["DOCUMENT_ROOT"] = ""
|
||||
}
|
||||
|
||||
if !env.Has("REMOTE_ADDR") {
|
||||
env["REMOTE_ADDR"] = this.requestRemoteAddr()
|
||||
}
|
||||
if !env.Has("QUERY_STRING") {
|
||||
u, err := url.ParseRequestURI(this.uri)
|
||||
if err == nil {
|
||||
env["QUERY_STRING"] = u.RawQuery
|
||||
} else {
|
||||
env["QUERY_STRING"] = this.RawReq.URL.RawQuery
|
||||
}
|
||||
}
|
||||
if !env.Has("SERVER_NAME") {
|
||||
env["SERVER_NAME"] = this.Host
|
||||
}
|
||||
if !env.Has("REQUEST_URI") {
|
||||
env["REQUEST_URI"] = this.uri
|
||||
}
|
||||
if !env.Has("HOST") {
|
||||
env["HOST"] = this.Host
|
||||
}
|
||||
|
||||
if len(this.ServerAddr) > 0 {
|
||||
if !env.Has("SERVER_ADDR") {
|
||||
env["SERVER_ADDR"] = this.ServerAddr
|
||||
}
|
||||
if !env.Has("SERVER_PORT") {
|
||||
_, port, err := net.SplitHostPort(this.ServerAddr)
|
||||
if err == nil {
|
||||
env["SERVER_PORT"] = port
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 连接池配置
|
||||
poolSize := fastcgi.PoolSize
|
||||
if poolSize <= 0 {
|
||||
poolSize = 32
|
||||
}
|
||||
|
||||
client, err := pkg.SharedPool(fastcgi.Network(), fastcgi.RealAddress(), uint(poolSize)).Client()
|
||||
if err != nil {
|
||||
this.write500(err)
|
||||
return
|
||||
}
|
||||
|
||||
// 请求相关
|
||||
if !env.Has("REQUEST_METHOD") {
|
||||
env["REQUEST_METHOD"] = this.RawReq.Method
|
||||
}
|
||||
if !env.Has("CONTENT_LENGTH") {
|
||||
env["CONTENT_LENGTH"] = fmt.Sprintf("%d", this.RawReq.ContentLength)
|
||||
}
|
||||
if !env.Has("CONTENT_TYPE") {
|
||||
env["CONTENT_TYPE"] = this.RawReq.Header.Get("Content-Type")
|
||||
}
|
||||
if !env.Has("SERVER_SOFTWARE") {
|
||||
env["SERVER_SOFTWARE"] = teaconst.ProductName + "/v" + teaconst.Version
|
||||
}
|
||||
|
||||
// 处理SCRIPT_FILENAME
|
||||
scriptPath := env.GetString("SCRIPT_FILENAME")
|
||||
if len(scriptPath) > 0 && (strings.Index(scriptPath, "/") < 0 && strings.Index(scriptPath, "\\") < 0) {
|
||||
env["SCRIPT_FILENAME"] = env.GetString("DOCUMENT_ROOT") + Tea.DS + scriptPath
|
||||
}
|
||||
scriptFilename := filepath.Base(this.RawReq.URL.Path)
|
||||
|
||||
// PATH_INFO
|
||||
pathInfoReg := fastcgi.PathInfoRegexp()
|
||||
pathInfo := ""
|
||||
if pathInfoReg != nil {
|
||||
matches := pathInfoReg.FindStringSubmatch(this.RawReq.URL.Path)
|
||||
countMatches := len(matches)
|
||||
if countMatches == 1 {
|
||||
pathInfo = matches[0]
|
||||
} else if countMatches == 2 {
|
||||
pathInfo = matches[1]
|
||||
} else if countMatches > 2 {
|
||||
scriptFilename = matches[1]
|
||||
pathInfo = matches[2]
|
||||
}
|
||||
|
||||
if !env.Has("PATH_INFO") {
|
||||
env["PATH_INFO"] = pathInfo
|
||||
}
|
||||
}
|
||||
|
||||
this.addVarMapping(map[string]string{
|
||||
"fastcgi.documentRoot": env.GetString("DOCUMENT_ROOT"),
|
||||
"fastcgi.filename": scriptFilename,
|
||||
"fastcgi.pathInfo": pathInfo,
|
||||
})
|
||||
|
||||
params := map[string]string{}
|
||||
for key, value := range env {
|
||||
params[key] = this.Format(types.String(value))
|
||||
}
|
||||
|
||||
this.processRequestHeaders(this.RawReq.Header)
|
||||
for k, v := range this.RawReq.Header {
|
||||
if k == "Connection" {
|
||||
continue
|
||||
}
|
||||
for _, subV := range v {
|
||||
params["HTTP_"+strings.ToUpper(strings.Replace(k, "-", "_", -1))] = subV
|
||||
}
|
||||
}
|
||||
|
||||
host, found := params["HTTP_HOST"]
|
||||
if !found || len(host) == 0 {
|
||||
params["HTTP_HOST"] = this.Host
|
||||
}
|
||||
|
||||
fcgiReq := pkg.NewRequest()
|
||||
fcgiReq.SetTimeout(fastcgi.ReadTimeoutDuration())
|
||||
fcgiReq.SetParams(params)
|
||||
fcgiReq.SetBody(this.RawReq.Body, uint32(this.requestLength()))
|
||||
|
||||
resp, stderr, err := client.Call(fcgiReq)
|
||||
if err != nil {
|
||||
this.write500(err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(stderr) > 0 {
|
||||
err := errors.New("Fastcgi Error: " + strings.TrimSpace(string(stderr)) + " script: " + maps.NewMap(params).GetString("SCRIPT_FILENAME"))
|
||||
this.write500(err)
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
_ = resp.Body.Close()
|
||||
}()
|
||||
|
||||
// 设置Charset
|
||||
// TODO 这里应该可以设置文本类型的列表,以及是否强制覆盖所有文本类型的字符集
|
||||
if this.web.Charset != nil && this.web.Charset.IsOn && len(this.web.Charset.Charset) > 0 {
|
||||
contentTypes, ok := resp.Header["Content-Type"]
|
||||
if ok && len(contentTypes) > 0 {
|
||||
contentType := contentTypes[0]
|
||||
if _, found := textMimeMap[contentType]; found {
|
||||
resp.Header["Content-Type"][0] = contentType + "; charset=" + this.web.Charset.Charset
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 响应Header
|
||||
this.writer.AddHeaders(resp.Header)
|
||||
this.processResponseHeaders(resp.StatusCode)
|
||||
|
||||
// 准备
|
||||
this.writer.Prepare(resp.ContentLength, resp.StatusCode)
|
||||
|
||||
// 设置响应代码
|
||||
this.writer.WriteHeader(resp.StatusCode)
|
||||
|
||||
// 输出到客户端
|
||||
pool := this.bytePool(resp.ContentLength)
|
||||
buf := pool.Get()
|
||||
_, err = io.CopyBuffer(this.writer, resp.Body, buf)
|
||||
pool.Put(buf)
|
||||
|
||||
err1 := resp.Body.Close()
|
||||
if err1 != nil {
|
||||
remotelogs.Warn("REQUEST_FASTCGI", err1.Error())
|
||||
}
|
||||
|
||||
if err != nil && err != io.EOF {
|
||||
remotelogs.Warn("REQUEST_FASTCGI", err.Error())
|
||||
this.addError(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -2,6 +2,7 @@ package nodes
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
@@ -12,7 +13,7 @@ func (this *HTTPRequest) doHostRedirect() (blocked bool) {
|
||||
if !u.IsOn {
|
||||
continue
|
||||
}
|
||||
if u.MatchPrefix {
|
||||
if u.MatchPrefix { // 匹配前缀
|
||||
if strings.HasPrefix(fullURL, u.BeforeURL) {
|
||||
afterURL := u.AfterURL
|
||||
if u.KeepRequestURI {
|
||||
@@ -25,7 +26,39 @@ func (this *HTTPRequest) doHostRedirect() (blocked bool) {
|
||||
}
|
||||
return true
|
||||
}
|
||||
} else {
|
||||
} else if u.MatchRegexp { // 正则匹配
|
||||
reg := u.BeforeURLRegexp()
|
||||
if reg == nil {
|
||||
continue
|
||||
}
|
||||
matches := reg.FindStringSubmatch(fullURL)
|
||||
if len(matches) == 0 {
|
||||
continue
|
||||
}
|
||||
afterURL := u.AfterURL
|
||||
for i, match := range matches {
|
||||
afterURL = strings.ReplaceAll(afterURL, "${"+strconv.Itoa(i)+"}", match)
|
||||
}
|
||||
|
||||
subNames := reg.SubexpNames()
|
||||
if len(subNames) > 0 {
|
||||
for _, subName := range subNames {
|
||||
if len(subName) > 0 {
|
||||
index := reg.SubexpIndex(subName)
|
||||
if index > -1 {
|
||||
afterURL = strings.ReplaceAll(afterURL, "${"+subName+"}", matches[index])
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if u.Status <= 0 {
|
||||
http.Redirect(this.RawWriter, this.RawReq, afterURL, http.StatusTemporaryRedirect)
|
||||
} else {
|
||||
http.Redirect(this.RawWriter, this.RawReq, afterURL, u.Status)
|
||||
}
|
||||
return true
|
||||
} else { // 精准匹配
|
||||
if fullURL == u.RealBeforeURL() {
|
||||
if u.Status <= 0 {
|
||||
http.Redirect(this.RawWriter, this.RawReq, u.AfterURL, http.StatusTemporaryRedirect)
|
||||
|
||||
@@ -19,11 +19,10 @@ func (this *HTTPRequest) doRedirectToHTTPS(redirectToHTTPSConfig *serverconfigs.
|
||||
} else if redirectToHTTPSConfig.Port > 0 {
|
||||
lastIndex := strings.LastIndex(host, ":")
|
||||
if lastIndex > 0 {
|
||||
if redirectToHTTPSConfig.Port != 443 {
|
||||
host = host[:lastIndex] + ":" + strconv.Itoa(redirectToHTTPSConfig.Port)
|
||||
} else {
|
||||
host = host[:lastIndex]
|
||||
}
|
||||
host = host[:lastIndex]
|
||||
}
|
||||
if redirectToHTTPSConfig.Port != 443 {
|
||||
host = host + ":" + strconv.Itoa(redirectToHTTPSConfig.Port)
|
||||
}
|
||||
} else {
|
||||
lastIndex := strings.LastIndex(host, ":")
|
||||
|
||||
@@ -147,6 +147,12 @@ func (this *HTTPRequest) doReverseProxy() {
|
||||
return
|
||||
}
|
||||
|
||||
// 在HTTP/2下需要防止因为requestBody而导致Content-Length为空的问题
|
||||
if this.RawReq.ProtoMajor == 2 && this.RawReq.ContentLength == 0 {
|
||||
_ = this.RawReq.Body.Close()
|
||||
this.RawReq.Body = nil
|
||||
}
|
||||
|
||||
// 开始请求
|
||||
resp, err := client.Do(this.RawReq)
|
||||
if err != nil {
|
||||
@@ -183,7 +189,7 @@ func (this *HTTPRequest) doReverseProxy() {
|
||||
if this.doWAFResponse(resp) {
|
||||
err = resp.Body.Close()
|
||||
if err != nil {
|
||||
remotelogs.Error("REQUEST_REVERSE_PROXY", err.Error())
|
||||
remotelogs.Warn("REQUEST_REVERSE_PROXY", err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -195,7 +201,7 @@ func (this *HTTPRequest) doReverseProxy() {
|
||||
if len(this.web.Pages) > 0 && this.doPage(resp.StatusCode) {
|
||||
err = resp.Body.Close()
|
||||
if err != nil {
|
||||
remotelogs.Error("REQUEST_REVERSE_PROXY", err.Error())
|
||||
remotelogs.Warn("REQUEST_REVERSE_PROXY", err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -220,7 +226,7 @@ func (this *HTTPRequest) doReverseProxy() {
|
||||
shouldAutoFlush := this.reverseProxy.AutoFlush || this.RawReq.Header.Get("Accept") == "text/event-stream"
|
||||
|
||||
// 准备
|
||||
this.writer.Prepare(resp.ContentLength)
|
||||
this.writer.Prepare(resp.ContentLength, resp.StatusCode)
|
||||
|
||||
// 设置响应代码
|
||||
this.writer.WriteHeader(resp.StatusCode)
|
||||
@@ -250,11 +256,15 @@ func (this *HTTPRequest) doReverseProxy() {
|
||||
|
||||
err1 := resp.Body.Close()
|
||||
if err1 != nil {
|
||||
remotelogs.Error("REQUEST_REVERSE_PROXY", err1.Error())
|
||||
if !this.canIgnore(err) {
|
||||
remotelogs.Warn("REQUEST_REVERSE_PROXY", err1.Error())
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil && err != io.EOF {
|
||||
remotelogs.Error("REQUEST_REVERSE_PROXY", err.Error())
|
||||
this.addError(err)
|
||||
if !this.canIgnore(err) {
|
||||
remotelogs.Warn("REQUEST_REVERSE_PROXY", err.Error())
|
||||
this.addError(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -296,7 +296,7 @@ func (this *HTTPRequest) doRoot() (isBreak bool) {
|
||||
this.cacheRef = nil // 不支持缓存
|
||||
}
|
||||
|
||||
this.writer.Prepare(fileSize)
|
||||
this.writer.Prepare(fileSize, http.StatusOK)
|
||||
|
||||
pool := this.bytePool(fileSize)
|
||||
buf := pool.Get()
|
||||
|
||||
@@ -50,7 +50,11 @@ func (this *HTTPRequest) doURL(method string, url string, host string, statusCod
|
||||
}
|
||||
|
||||
this.writer.AddHeaders(resp.Header)
|
||||
this.writer.Prepare(resp.ContentLength)
|
||||
if statusCode <= 0 {
|
||||
this.writer.Prepare(resp.ContentLength, resp.StatusCode)
|
||||
} else {
|
||||
this.writer.Prepare(resp.ContentLength, statusCode)
|
||||
}
|
||||
|
||||
// 设置响应代码
|
||||
if statusCode <= 0 {
|
||||
|
||||
@@ -64,7 +64,9 @@ func (this *HTTPWriter) Gzip(config *serverconfigs.HTTPGzipConfig) {
|
||||
}
|
||||
|
||||
// 准备输出
|
||||
func (this *HTTPWriter) Prepare(size int64) {
|
||||
func (this *HTTPWriter) Prepare(size int64, status int) {
|
||||
this.statusCode = status
|
||||
|
||||
this.prepareGzip(size)
|
||||
this.prepareCache(size)
|
||||
}
|
||||
@@ -304,7 +306,7 @@ func (this *HTTPWriter) prepareGzip(size int64) {
|
||||
|
||||
// 准备缓存
|
||||
func (this *HTTPWriter) prepareCache(size int64) {
|
||||
if this.writer == nil || size <= 0 {
|
||||
if this.writer == nil {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -319,10 +321,16 @@ func (this *HTTPWriter) prepareCache(size int64) {
|
||||
}
|
||||
|
||||
cacheRef := this.req.cacheRef
|
||||
if cacheRef == nil ||
|
||||
!cacheRef.IsOn ||
|
||||
(cacheRef.MaxSizeBytes() > 0 && size > cacheRef.MaxSizeBytes()) ||
|
||||
(cachePolicy.MaxSizeBytes() > 0 && size > cachePolicy.MaxSizeBytes()) {
|
||||
if cacheRef == nil || !cacheRef.IsOn {
|
||||
return
|
||||
}
|
||||
|
||||
// 如果允许 ChunkedEncoding,就无需尺寸的判断,因为此时的 size 为 -1
|
||||
if !cacheRef.AllowChunkedEncoding && size < 0 {
|
||||
return
|
||||
}
|
||||
if size >= 0 && ((cacheRef.MaxSizeBytes() > 0 && size > cacheRef.MaxSizeBytes()) ||
|
||||
(cachePolicy.MaxSizeBytes() > 0 && size > cachePolicy.MaxSizeBytes())) {
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -48,6 +48,7 @@ func (this *Listener) Listen() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
netListener = NewTrafficListener(netListener)
|
||||
events.On(events.EventQuit, func() {
|
||||
remotelogs.Println("LISTENER", "quit "+this.group.FullAddr())
|
||||
_ = netListener.Close()
|
||||
|
||||
@@ -12,21 +12,21 @@ import (
|
||||
|
||||
var sharedListenerManager = NewListenerManager()
|
||||
|
||||
// 端口监听管理器
|
||||
// ListenerManager 端口监听管理器
|
||||
type ListenerManager struct {
|
||||
listenersMap map[string]*Listener // addr => *Listener
|
||||
locker sync.Mutex
|
||||
lastConfig *nodeconfigs.NodeConfig
|
||||
}
|
||||
|
||||
// 获取新对象
|
||||
// NewListenerManager 获取新对象
|
||||
func NewListenerManager() *ListenerManager {
|
||||
return &ListenerManager{
|
||||
listenersMap: map[string]*Listener{},
|
||||
}
|
||||
}
|
||||
|
||||
// 启动监听
|
||||
// Start 启动监听
|
||||
func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error {
|
||||
this.locker.Lock()
|
||||
defer this.locker.Unlock()
|
||||
@@ -83,7 +83,13 @@ func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error {
|
||||
listener.Reload(group)
|
||||
err := listener.Listen()
|
||||
if err != nil {
|
||||
remotelogs.Error("LISTENER_MANAGER", err.Error())
|
||||
firstServer := group.FirstServer()
|
||||
if firstServer == nil {
|
||||
remotelogs.Error("LISTENER_MANAGER", err.Error())
|
||||
} else {
|
||||
remotelogs.ServerError(firstServer.Id, "LISTENER_MANAGER", err.Error())
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
this.listenersMap[addr] = listener
|
||||
@@ -93,7 +99,7 @@ func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 获取总的活跃连接数
|
||||
// TotalActiveConnections 获取总的活跃连接数
|
||||
func (this *ListenerManager) TotalActiveConnections() int {
|
||||
this.locker.Lock()
|
||||
defer this.locker.Unlock()
|
||||
|
||||
@@ -362,6 +362,8 @@ func (this *Node) syncConfig() error {
|
||||
}
|
||||
|
||||
nodeconfigs.ResetNodeConfig(nodeConfig)
|
||||
caches.SharedManager.MaxDiskCapacity = nodeConfig.MaxCacheDiskCapacity
|
||||
caches.SharedManager.MaxMemoryCapacity = nodeConfig.MaxCacheMemoryCapacity
|
||||
if nodeConfig.HTTPCachePolicy != nil {
|
||||
caches.SharedManager.UpdatePolicies([]*serverconfigs.HTTPCachePolicy{nodeConfig.HTTPCachePolicy})
|
||||
} else {
|
||||
|
||||
@@ -4,12 +4,15 @@ import (
|
||||
"encoding/json"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/caches"
|
||||
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/monitor"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/rpc"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/utils"
|
||||
"github.com/iwind/TeaGo/lists"
|
||||
"github.com/iwind/TeaGo/maps"
|
||||
"github.com/shirou/gopsutil/cpu"
|
||||
"github.com/shirou/gopsutil/disk"
|
||||
"os"
|
||||
@@ -62,6 +65,13 @@ func (this *NodeStatusExecutor) update() {
|
||||
status.ConfigVersion = sharedNodeConfig.Version
|
||||
status.IsActive = true
|
||||
status.ConnectionCount = sharedListenerManager.TotalActiveConnections()
|
||||
status.CacheTotalDiskSize = caches.SharedManager.TotalDiskSize()
|
||||
status.CacheTotalMemorySize = caches.SharedManager.TotalMemorySize()
|
||||
|
||||
// 记录监控数据
|
||||
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemConnections, maps.Map{
|
||||
"total": status.ConnectionCount,
|
||||
})
|
||||
|
||||
hostname, _ := os.Hostname()
|
||||
status.Hostname = hostname
|
||||
@@ -108,6 +118,11 @@ func (this *NodeStatusExecutor) updateCPU(status *nodeconfigs.NodeStatus) {
|
||||
}
|
||||
status.CPUUsage = percents[0] / 100
|
||||
|
||||
// 记录监控数据
|
||||
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemCPU, maps.Map{
|
||||
"usage": status.CPUUsage,
|
||||
})
|
||||
|
||||
if this.cpuLogicalCount == 0 && this.cpuPhysicalCount == 0 {
|
||||
this.cpuUpdatedTime = time.Now()
|
||||
|
||||
@@ -188,4 +203,11 @@ func (this *NodeStatusExecutor) updateDisk(status *nodeconfigs.NodeStatus) {
|
||||
status.DiskTotal = total
|
||||
status.DiskUsage = float64(totalUsage) / float64(total)
|
||||
status.DiskMaxUsage = maxUsage / 100
|
||||
|
||||
// 记录监控数据
|
||||
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemDisk, maps.Map{
|
||||
"total": status.DiskTotal,
|
||||
"usage": status.DiskUsage,
|
||||
"maxUsage": status.DiskMaxUsage,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -4,6 +4,8 @@ package nodes
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/monitor"
|
||||
"github.com/iwind/TeaGo/maps"
|
||||
"github.com/shirou/gopsutil/load"
|
||||
"github.com/shirou/gopsutil/mem"
|
||||
)
|
||||
@@ -22,6 +24,13 @@ func (this *NodeStatusExecutor) updateMem(status *nodeconfigs.NodeStatus) {
|
||||
}
|
||||
|
||||
status.MemoryTotal = stat.Total
|
||||
|
||||
// 记录监控数据
|
||||
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemMemory, maps.Map{
|
||||
"usage": status.MemoryUsage,
|
||||
"total": status.MemoryTotal,
|
||||
"used": stat.Used,
|
||||
})
|
||||
}
|
||||
|
||||
// 更新负载
|
||||
@@ -38,4 +47,11 @@ func (this *NodeStatusExecutor) updateLoad(status *nodeconfigs.NodeStatus) {
|
||||
status.Load1m = stat.Load1
|
||||
status.Load5m = stat.Load5
|
||||
status.Load15m = stat.Load15
|
||||
|
||||
// 记录监控数据
|
||||
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemLoad, maps.Map{
|
||||
"load1m": status.Load1m,
|
||||
"load5m": status.Load5m,
|
||||
"load15m": status.Load15m,
|
||||
})
|
||||
}
|
||||
|
||||
92
internal/nodes/traffic_conn.go
Normal file
92
internal/nodes/traffic_conn.go
Normal file
@@ -0,0 +1,92 @@
|
||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/events"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/monitor"
|
||||
"github.com/iwind/TeaGo/maps"
|
||||
"net"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// 流量统计
|
||||
var inTrafficBytes = uint64(0)
|
||||
var outTrafficBytes = uint64(0)
|
||||
|
||||
// 发送监控流量
|
||||
func init() {
|
||||
events.On(events.EventStart, func() {
|
||||
ticker := time.NewTicker(1 * time.Minute)
|
||||
go func() {
|
||||
for range ticker.C {
|
||||
// 加入到数据队列中
|
||||
if inTrafficBytes > 0 {
|
||||
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemTrafficIn, maps.Map{
|
||||
"total": inTrafficBytes,
|
||||
})
|
||||
}
|
||||
if outTrafficBytes > 0 {
|
||||
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemTrafficOut, maps.Map{
|
||||
"total": outTrafficBytes,
|
||||
})
|
||||
}
|
||||
|
||||
// 重置数据
|
||||
atomic.StoreUint64(&inTrafficBytes, 0)
|
||||
atomic.StoreUint64(&outTrafficBytes, 0)
|
||||
}
|
||||
}()
|
||||
})
|
||||
}
|
||||
|
||||
// TrafficConn 用于统计流量的连接
|
||||
type TrafficConn struct {
|
||||
rawConn net.Conn
|
||||
}
|
||||
|
||||
func NewTrafficConn(conn net.Conn) net.Conn {
|
||||
return &TrafficConn{rawConn: conn}
|
||||
}
|
||||
|
||||
func (this *TrafficConn) Read(b []byte) (n int, err error) {
|
||||
n, err = this.rawConn.Read(b)
|
||||
if n > 0 {
|
||||
atomic.AddUint64(&inTrafficBytes, uint64(n))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (this *TrafficConn) Write(b []byte) (n int, err error) {
|
||||
n, err = this.rawConn.Write(b)
|
||||
if n > 0 {
|
||||
atomic.AddUint64(&outTrafficBytes, uint64(n))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (this *TrafficConn) Close() error {
|
||||
return this.rawConn.Close()
|
||||
}
|
||||
|
||||
func (this *TrafficConn) LocalAddr() net.Addr {
|
||||
return this.rawConn.LocalAddr()
|
||||
}
|
||||
|
||||
func (this *TrafficConn) RemoteAddr() net.Addr {
|
||||
return this.rawConn.RemoteAddr()
|
||||
}
|
||||
|
||||
func (this *TrafficConn) SetDeadline(t time.Time) error {
|
||||
return this.rawConn.SetDeadline(t)
|
||||
}
|
||||
|
||||
func (this *TrafficConn) SetReadDeadline(t time.Time) error {
|
||||
return this.rawConn.SetReadDeadline(t)
|
||||
}
|
||||
|
||||
func (this *TrafficConn) SetWriteDeadline(t time.Time) error {
|
||||
return this.rawConn.SetWriteDeadline(t)
|
||||
}
|
||||
30
internal/nodes/traffic_listener.go
Normal file
30
internal/nodes/traffic_listener.go
Normal file
@@ -0,0 +1,30 @@
|
||||
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
|
||||
|
||||
package nodes
|
||||
|
||||
import "net"
|
||||
|
||||
// TrafficListener 用于统计流量的网络监听
|
||||
type TrafficListener struct {
|
||||
rawListener net.Listener
|
||||
}
|
||||
|
||||
func NewTrafficListener(listener net.Listener) net.Listener {
|
||||
return &TrafficListener{rawListener: listener}
|
||||
}
|
||||
|
||||
func (this *TrafficListener) Accept() (net.Conn, error) {
|
||||
conn, err := this.rawListener.Accept()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewTrafficConn(conn), nil
|
||||
}
|
||||
|
||||
func (this *TrafficListener) Close() error {
|
||||
return this.rawListener.Close()
|
||||
}
|
||||
|
||||
func (this *TrafficListener) Addr() net.Addr {
|
||||
return this.rawListener.Addr()
|
||||
}
|
||||
@@ -24,7 +24,7 @@ func init() {
|
||||
}()
|
||||
}
|
||||
|
||||
// 打印普通信息
|
||||
// Println 打印普通信息
|
||||
func Println(tag string, description string) {
|
||||
logs.Println("[" + tag + "]" + description)
|
||||
|
||||
@@ -47,7 +47,7 @@ func Println(tag string, description string) {
|
||||
}
|
||||
}
|
||||
|
||||
// 打印警告信息
|
||||
// Warn 打印警告信息
|
||||
func Warn(tag string, description string) {
|
||||
logs.Println("[" + tag + "]" + description)
|
||||
|
||||
@@ -70,7 +70,7 @@ func Warn(tag string, description string) {
|
||||
}
|
||||
}
|
||||
|
||||
// 打印错误信息
|
||||
// Error 打印错误信息
|
||||
func Error(tag string, description string) {
|
||||
logs.Println("[" + tag + "]" + description)
|
||||
|
||||
@@ -93,6 +93,30 @@ func Error(tag string, description string) {
|
||||
}
|
||||
}
|
||||
|
||||
// ServerError 打印错误信息
|
||||
func ServerError(serverId int64, tag string, description string) {
|
||||
logs.Println("[" + tag + "]" + description)
|
||||
|
||||
nodeConfig, _ := nodeconfigs.SharedNodeConfig()
|
||||
if nodeConfig == nil {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case logChan <- &pb.NodeLog{
|
||||
Role: teaconst.Role,
|
||||
Tag: tag,
|
||||
Description: description,
|
||||
Level: "error",
|
||||
NodeId: nodeConfig.Id,
|
||||
ServerId: serverId,
|
||||
CreatedAt: time.Now().Unix(),
|
||||
}:
|
||||
default:
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// 上传日志
|
||||
func uploadLogs() error {
|
||||
logList := []*pb.NodeLog{}
|
||||
|
||||
@@ -57,6 +57,10 @@ func (this *RPCClient) NodeTaskRPC() pb.NodeTaskServiceClient {
|
||||
return pb.NewNodeTaskServiceClient(this.pickConn())
|
||||
}
|
||||
|
||||
func (this *RPCClient) NodeValueRPC() pb.NodeValueServiceClient {
|
||||
return pb.NewNodeValueServiceClient(this.pickConn())
|
||||
}
|
||||
|
||||
func (this *RPCClient) HTTPAccessLogRPC() pb.HTTPAccessLogServiceClient {
|
||||
return pb.NewHTTPAccessLogServiceClient(this.pickConn())
|
||||
}
|
||||
@@ -105,7 +109,7 @@ func (this *RPCClient) ServerDailyStatRPC() pb.ServerDailyStatServiceClient {
|
||||
return pb.NewServerDailyStatServiceClient(this.pickConn())
|
||||
}
|
||||
|
||||
// 节点上下文信息
|
||||
// Context 节点上下文信息
|
||||
func (this *RPCClient) Context() context.Context {
|
||||
ctx := context.Background()
|
||||
m := maps.Map{
|
||||
@@ -128,7 +132,7 @@ func (this *RPCClient) Context() context.Context {
|
||||
return ctx
|
||||
}
|
||||
|
||||
// 集群上下文
|
||||
// ClusterContext 集群上下文
|
||||
func (this *RPCClient) ClusterContext(clusterId string, clusterSecret string) context.Context {
|
||||
ctx := context.Background()
|
||||
m := maps.Map{
|
||||
@@ -151,14 +155,14 @@ func (this *RPCClient) ClusterContext(clusterId string, clusterSecret string) co
|
||||
return ctx
|
||||
}
|
||||
|
||||
// 关闭连接
|
||||
// Close 关闭连接
|
||||
func (this *RPCClient) Close() {
|
||||
for _, conn := range this.conns {
|
||||
_ = conn.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// 修改配置
|
||||
// UpdateConfig 修改配置
|
||||
func (this *RPCClient) UpdateConfig(config *configs.APIConfig) error {
|
||||
this.apiConfig = config
|
||||
return this.init()
|
||||
|
||||
@@ -1,55 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
|
||||
"github.com/TeaOSLab/EdgeNode/internal/configs"
|
||||
_ "github.com/iwind/TeaGo/bootstrap"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestRPCClient_NodeRPC(t *testing.T) {
|
||||
before := time.Now()
|
||||
defer func() {
|
||||
t.Log(time.Since(before).Seconds()*1000, "ms")
|
||||
}()
|
||||
config, err := configs.LoadAPIConfig()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rpc, err := NewRPCClient(config)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
resp, err := rpc.NodeRPC().ComposeNodeConfig(rpc.Context(), &pb.ComposeNodeConfigRequest{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log(resp)
|
||||
}
|
||||
|
||||
func TestSharedRPC_Stream(t *testing.T) {
|
||||
config, err := configs.LoadAPIConfig()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rpc, err := NewRPCClient(config)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
client, err := rpc.NodeRPC().NodeStream(rpc.Context())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = client.Send(&pb.NodeStreamRequest{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for {
|
||||
resp, err := client.Recv()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log("recv:", resp)
|
||||
}
|
||||
}
|
||||
@@ -15,14 +15,14 @@ import (
|
||||
|
||||
var SharedTrafficStatManager = NewTrafficStatManager()
|
||||
|
||||
// 区域流量统计
|
||||
// TrafficStatManager 区域流量统计
|
||||
type TrafficStatManager struct {
|
||||
m map[string]int64 // [timestamp serverId] => bytes
|
||||
locker sync.Mutex
|
||||
configFunc func() *nodeconfigs.NodeConfig
|
||||
}
|
||||
|
||||
// 获取新对象
|
||||
// NewTrafficStatManager 获取新对象
|
||||
func NewTrafficStatManager() *TrafficStatManager {
|
||||
manager := &TrafficStatManager{
|
||||
m: map[string]int64{},
|
||||
@@ -31,7 +31,7 @@ func NewTrafficStatManager() *TrafficStatManager {
|
||||
return manager
|
||||
}
|
||||
|
||||
// 启动自动任务
|
||||
// Start 启动自动任务
|
||||
func (this *TrafficStatManager) Start(configFunc func() *nodeconfigs.NodeConfig) {
|
||||
this.configFunc = configFunc
|
||||
|
||||
@@ -54,7 +54,7 @@ func (this *TrafficStatManager) Start(configFunc func() *nodeconfigs.NodeConfig)
|
||||
}
|
||||
}
|
||||
|
||||
// 添加流量
|
||||
// Add 添加流量
|
||||
func (this *TrafficStatManager) Add(serverId int64, bytes int64) {
|
||||
if bytes == 0 {
|
||||
return
|
||||
@@ -68,7 +68,7 @@ func (this *TrafficStatManager) Add(serverId int64, bytes int64) {
|
||||
this.locker.Unlock()
|
||||
}
|
||||
|
||||
// 上传流量
|
||||
// Upload 上传流量
|
||||
func (this *TrafficStatManager) Upload() error {
|
||||
config := this.configFunc()
|
||||
if config == nil {
|
||||
|
||||
@@ -23,13 +23,17 @@ func (this *Request) Raw() *http.Request {
|
||||
}
|
||||
|
||||
func (this *Request) ReadBody(max int64) (data []byte, err error) {
|
||||
data, err = ioutil.ReadAll(io.LimitReader(this.Request.Body, max))
|
||||
if this.Request.ContentLength > 0 {
|
||||
data, err = ioutil.ReadAll(io.LimitReader(this.Request.Body, max))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (this *Request) RestoreBody(data []byte) {
|
||||
rawReader := bytes.NewBuffer(data)
|
||||
buf := make([]byte, 1024)
|
||||
io.CopyBuffer(rawReader, this.Request.Body, buf)
|
||||
this.Request.Body = ioutil.NopCloser(rawReader)
|
||||
if len(data) > 0 {
|
||||
rawReader := bytes.NewBuffer(data)
|
||||
buf := make([]byte, 1024)
|
||||
_, _ = io.CopyBuffer(rawReader, this.Request.Body, buf)
|
||||
this.Request.Body = ioutil.NopCloser(rawReader)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user