Compare commits

...

14 Commits

Author SHA1 Message Date
刘祥超
496ee6cfa0 优化代码 2021-05-24 09:37:37 +08:00
刘祥超
437914a321 支持缓存策略全局的缓存条件/X-Cache中加入更多信息 2021-05-24 09:23:51 +08:00
刘祥超
3a93bf756a 加快缓存策略启动速度 2021-05-23 22:59:00 +08:00
刘祥超
38d81f340e 调整个别日志级别 2021-05-23 20:45:14 +08:00
刘祥超
889b9d063a URL跳转支持正则匹配 2021-05-23 17:01:08 +08:00
刘祥超
4c73b3618f 优化代码 2021-05-23 16:16:56 +08:00
刘祥超
df5f50682a 不再提示http2 Stream相关错误 2021-05-23 15:50:21 +08:00
刘祥超
9545bf69db 删除一个注释 2021-05-23 14:31:25 +08:00
刘祥超
b2f18c22ee 修复缓存状态码不生效的问题 2021-05-23 14:29:56 +08:00
刘祥超
63e3b7ac2f 修复跳转到HTTPS的自定义端口无法起作用的Bug 2021-05-22 10:26:37 +08:00
刘祥超
760a62c286 修改两处日志级别 2021-05-22 10:26:12 +08:00
刘祥超
296848a6d6 优化一个文件缓存统计的Bug 2021-05-19 22:14:57 +08:00
刘祥超
4e04534244 更改版本号 2021-05-19 14:16:04 +08:00
刘祥超
cad43e610d 缓存文件列表使用sqlite管理 2021-05-19 12:07:35 +08:00
30 changed files with 973 additions and 325 deletions

View File

@@ -33,6 +33,7 @@ function build() {
mkdir $DIST/bin
mkdir $DIST/configs
mkdir $DIST/logs
mkdir $DIST/data
fi
cp $ROOT/configs/api.template.yaml $DIST/configs

1
build/data/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
index.*

2
go.mod
View File

@@ -15,10 +15,12 @@ require (
github.com/iwind/TeaGo v0.0.0-20201020081413-7cf62d6f420f
github.com/iwind/gofcgi v0.0.0-20210506081859-17498ab3e9d7
github.com/lionsoul2014/ip2region v2.2.0-release+incompatible
github.com/mattn/go-sqlite3 v1.14.7
github.com/mssola/user_agent v0.5.2
github.com/shirou/gopsutil v2.20.9+incompatible
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299
golang.org/x/text v0.3.2
google.golang.org/grpc v1.32.0
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776
)

2
go.sum
View File

@@ -67,6 +67,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lionsoul2014/ip2region v2.2.0-release+incompatible h1:1qp9iks+69h7IGLazAplzS9Ca14HAxuD5c0rbFdPGy4=
github.com/lionsoul2014/ip2region v2.2.0-release+incompatible/go.mod h1:+ZBN7PBoh5gG6/y0ZQ85vJDBe21WnfbRrQQwTfliJJI=
github.com/mattn/go-sqlite3 v1.14.7 h1:fxWBnXkxfM6sRiuH3bqJ4CfzZojMOLVc0UTsTglEghA=
github.com/mattn/go-sqlite3 v1.14.7/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=

View File

@@ -10,12 +10,12 @@ const (
)
type Item struct {
Type ItemType
Key string
ExpiredAt int64
HeaderSize int64
BodySize int64
MetaSize int64
Type ItemType `json:"type"`
Key string `json:"key"`
ExpiredAt int64 `json:"expiredAt"`
HeaderSize int64 `json:"headerSize"`
BodySize int64 `json:"bodySize"`
MetaSize int64 `json:"metaSize"`
}
func (this *Item) IsExpired() bool {
@@ -23,7 +23,7 @@ func (this *Item) IsExpired() bool {
}
func (this *Item) TotalSize() int64 {
return this.Size() + this.MetaSize + int64(len(this.Key))
return this.Size() + this.MetaSize + int64(len(this.Key)) + 64
}
func (this *Item) Size() int64 {

View File

@@ -1,145 +1,33 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package caches
import (
"strings"
"sync"
)
type ListInterface interface {
Init() error
// 缓存列表管理
type List struct {
m map[string]*Item // hash => item
locker sync.RWMutex
onAdd func(item *Item)
onRemove func(item *Item)
}
func NewList() *List {
return &List{
m: map[string]*Item{},
}
}
func (this *List) Reset() {
this.locker.Lock()
this.m = map[string]*Item{}
this.locker.Unlock()
}
func (this *List) Add(hash string, item *Item) {
this.locker.Lock()
if this.onAdd != nil {
this.onAdd(item)
}
this.m[hash] = item
this.locker.Unlock()
}
func (this *List) Exist(hash string) bool {
this.locker.RLock()
defer this.locker.RUnlock()
item, ok := this.m[hash]
if !ok {
return false
}
return !item.IsExpired()
}
// 根据前缀进行查找
func (this *List) FindKeysWithPrefix(prefix string) (keys []string) {
this.locker.RLock()
defer this.locker.RUnlock()
// TODO 需要优化性能支持千万级数据低于1s的处理速度
for _, item := range this.m {
if strings.HasPrefix(item.Key, prefix) {
keys = append(keys, item.Key)
}
}
return
}
func (this *List) Remove(hash string) {
this.locker.Lock()
item, ok := this.m[hash]
if ok {
if this.onRemove != nil {
this.onRemove(item)
}
delete(this.m, hash)
}
this.locker.Unlock()
}
// 清理过期的缓存
// count 每次遍历的最大数量,控制此数字可以保证每次清理的时候不用花太多时间
// callback 每次发现过期key的调用
func (this *List) Purge(count int, callback func(hash string)) {
this.locker.Lock()
deletedHashList := []string{}
for hash, item := range this.m {
if count <= 0 {
break
}
if item.IsExpired() {
if this.onRemove != nil {
this.onRemove(item)
}
delete(this.m, hash)
deletedHashList = append(deletedHashList, hash)
}
count--
}
this.locker.Unlock()
// 执行外部操作
for _, hash := range deletedHashList {
if callback != nil {
callback(hash)
}
}
}
func (this *List) Stat(check func(hash string) bool) *Stat {
this.locker.RLock()
defer this.locker.RUnlock()
result := &Stat{
Count: 0,
Size: 0,
}
for hash, item := range this.m {
if !item.IsExpired() {
// 检查文件是否存在、内容是否正确等
if check != nil && check(hash) {
result.Count++
result.ValueSize += item.Size()
result.Size += item.TotalSize()
}
}
}
return result
}
// 总数量
func (this *List) Count() int64 {
this.locker.RLock()
count := int64(len(this.m))
this.locker.RUnlock()
return count
}
// 添加事件
func (this *List) OnAdd(f func(item *Item)) {
this.onAdd = f
}
// 删除事件
func (this *List) OnRemove(f func(item *Item)) {
this.onRemove = f
Reset() error
Add(hash string, item *Item) error
Exist(hash string) (bool, error)
// FindKeysWithPrefix 根据前缀进行查找
FindKeysWithPrefix(prefix string) (keys []string, err error)
Remove(hash string) error
Purge(count int, callback func(hash string) error) error
CleanAll() error
Stat(check func(hash string) bool) (*Stat, error)
// Count 总数量
Count() (int64, error)
// OnAdd 添加事件
OnAdd(f func(item *Item))
// OnRemove 删除事件
OnRemove(f func(item *Item))
}

View File

@@ -0,0 +1,260 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package caches
import (
"database/sql"
_ "github.com/mattn/go-sqlite3"
"sync/atomic"
"time"
)
// FileList 文件缓存列表管理
type FileList struct {
dir string
db *sql.DB
total int64
onAdd func(item *Item)
onRemove func(item *Item)
}
func NewFileList(dir string) ListInterface {
return &FileList{dir: dir}
}
func (this *FileList) Init() error {
db, err := sql.Open("sqlite3", "file:"+this.dir+"/index.db?cache=shared&mode=rwc")
if err != nil {
return err
}
db.SetMaxOpenConns(1)
_, err = db.Exec("VACUUM")
if err != nil {
return err
}
// 创建
// TODO accessesAt 用来存储访问时间,将来可以根据此访问时间删除不常访问的内容
// 且访问时间只需要每隔一个小时存储一个整数值即可,因为不需要那么精确
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS "cacheItems" (
"hash" varchar(32),
"key" varchar(1024),
"headerSize" integer DEFAULT 0,
"bodySize" integer DEFAULT 0,
"metaSize" integer DEFAULT 0,
"expiredAt" integer DEFAULT 0,
"accessedAt" integer DEFAULT 0
);
CREATE UNIQUE INDEX IF NOT EXISTS "hash"
ON "cacheItems" (
"hash"
);
CREATE INDEX IF NOT EXISTS "expiredAt"
ON "cacheItems" (
"expiredAt"
);
CREATE INDEX IF NOT EXISTS "accessedAt"
ON "cacheItems" (
"accessedAt"
);
`)
if err != nil {
return err
}
this.db = db
// 读取总数量
row := this.db.QueryRow("SELECT COUNT(*) FROM cacheItems")
if row.Err() != nil {
return row.Err()
}
var total int64
err = row.Scan(&total)
if err != nil {
return err
}
this.total = total
return nil
}
func (this *FileList) Reset() error {
// 不错任何事情
return nil
}
func (this *FileList) Add(hash string, item *Item) error {
_, err := this.db.Exec(`INSERT INTO cacheItems ("hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt") VALUES (?, ?, ?, ?, ?, ?)`, hash, item.Key, item.HeaderSize, item.BodySize, item.MetaSize, item.ExpiredAt)
if err != nil {
return err
}
atomic.AddInt64(&this.total, 1)
if this.onAdd != nil {
this.onAdd(item)
}
return nil
}
func (this *FileList) Exist(hash string) (bool, error) {
row := this.db.QueryRow(`SELECT "bodySize" FROM cacheItems WHERE "hash"=? LIMIT 1`, hash)
if row == nil {
return false, nil
}
if row.Err() != nil {
return false, row.Err()
}
var bodySize int
err := row.Scan(&bodySize)
if err != nil {
if err == sql.ErrNoRows {
return false, nil
}
return false, err
}
return true, nil
}
// FindKeysWithPrefix 根据前缀进行查找
func (this *FileList) FindKeysWithPrefix(prefix string) (keys []string, err error) {
if len(prefix) == 0 {
return
}
// TODO 需要优化上千万结果的情况
rows, err := this.db.Query(`SELECT "key" FROM cacheItems WHERE INSTR("key", ?)==1 LIMIT 100000`, prefix)
if err != nil {
return nil, err
}
defer func() {
_ = rows.Close()
}()
for rows.Next() {
var key string
err = rows.Scan(&key)
if err != nil {
return nil, err
}
keys = append(keys, key)
}
return
}
func (this *FileList) Remove(hash string) error {
row := this.db.QueryRow(`SELECT "key", "headerSize", "bodySize", "metaSize", "expiredAt" FROM cacheItems WHERE "hash"=? LIMIT 1`, hash)
if row.Err() != nil {
return row.Err()
}
var item = &Item{Type: ItemTypeFile}
err := row.Scan(&item.Key, &item.HeaderSize, &item.BodySize, &item.MetaSize, &item.ExpiredAt)
if err != nil {
if err == sql.ErrNoRows {
return nil
}
return err
}
_, err = this.db.Exec(`DELETE FROM cacheItems WHERE "hash"=?`, hash)
if err != nil {
return err
}
atomic.AddInt64(&this.total, -1)
if this.onRemove != nil {
this.onRemove(item)
}
return nil
}
// Purge 清理过期的缓存
// count 每次遍历的最大数量,控制此数字可以保证每次清理的时候不用花太多时间
// callback 每次发现过期key的调用
func (this *FileList) Purge(count int, callback func(hash string) error) error {
if count <= 0 {
count = 1000
}
rows, err := this.db.Query(`SELECT "hash" FROM cacheItems WHERE expiredAt<=? LIMIT ?`, time.Now().Unix(), count)
if err != nil {
return err
}
defer func() {
_ = rows.Close()
}()
hashStrings := []string{}
for rows.Next() {
var hash string
err = rows.Scan(&hash)
if err != nil {
return err
}
hashStrings = append(hashStrings, hash)
}
// 不在 rows.Next() 循环中操作是为了避免死锁
for _, hash := range hashStrings {
err = this.Remove(hash)
if err != nil {
return err
}
err = callback(hash)
if err != nil {
return err
}
}
return nil
}
func (this *FileList) CleanAll() error {
_, err := this.db.Exec("DELETE FROM cacheItems")
if err != nil {
return err
}
atomic.StoreInt64(&this.total, 0)
return nil
}
func (this *FileList) Stat(check func(hash string) bool) (*Stat, error) {
// 这里不设置过期时间、不使用 check 函数,目的是让查询更快速一些
row := this.db.QueryRow("SELECT COUNT(*), IFNULL(SUM(headerSize+bodySize+metaSize), 0), IFNULL(SUM(headerSize+bodySize), 0) FROM cacheItems")
if row.Err() != nil {
return nil, row.Err()
}
stat := &Stat{}
err := row.Scan(&stat.Count, &stat.Size, &stat.ValueSize)
if err != nil {
return nil, err
}
return stat, nil
}
// Count 总数量
// 常用的方法,所以避免直接查询数据库
func (this *FileList) Count() (int64, error) {
return atomic.LoadInt64(&this.total), nil
}
// OnAdd 添加事件
func (this *FileList) OnAdd(f func(item *Item)) {
this.onAdd = f
}
// OnRemove 删除事件
func (this *FileList) OnRemove(f func(item *Item)) {
this.onRemove = f
}

View File

@@ -0,0 +1,172 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package caches
import (
"github.com/iwind/TeaGo/Tea"
stringutil "github.com/iwind/TeaGo/utils/string"
"strconv"
"testing"
"time"
)
func TestFileList_Init(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}
func TestFileList_Add(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
t.Fatal(err)
}
err = list.Add(stringutil.Md5("123456"), &Item{
Key: "123456",
ExpiredAt: time.Now().Unix(),
HeaderSize: 1,
MetaSize: 2,
BodySize: 3,
})
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}
func TestFileList_Add_Many(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
t.Fatal(err)
}
for i := 0; i < 100_0000; i++ {
u := "http://edge.teaos.cn/123456" + strconv.Itoa(i)
err = list.Add(stringutil.Md5(u), &Item{
Key: u,
ExpiredAt: time.Now().Unix(),
HeaderSize: 1,
MetaSize: 2,
BodySize: 3,
})
if err != nil {
t.Fatal(err)
}
}
t.Log("ok")
}
func TestFileList_Exist(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
t.Fatal(err)
}
{
exists, err := list.Exist(stringutil.Md5("123456"))
if err != nil {
t.Fatal(err)
}
t.Log("exists:", exists)
}
{
exists, err := list.Exist(stringutil.Md5("654321"))
if err != nil {
t.Fatal(err)
}
t.Log("exists:", exists)
}
}
func TestFileList_FindKeysWithPrefix(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
t.Fatal(err)
}
before := time.Now()
keys, err := list.FindKeysWithPrefix("1234")
if err != nil {
t.Fatal(err)
}
t.Log("keys:", keys)
t.Log(time.Since(before).Seconds()*1000, "ms")
}
func TestFileList_Remove(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
t.Fatal(err)
}
list.OnRemove(func(item *Item) {
t.Logf("remove %#v", item)
})
err = list.Remove(stringutil.Md5("123456"))
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}
func TestFileList_Purge(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
t.Fatal(err)
}
err = list.Purge(2, func(hash string) error {
t.Log(hash)
return nil
})
if err != nil {
t.Fatal(err)
}
t.Log("ok")
}
func TestFileList_Stat(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
t.Fatal(err)
}
stat, err := list.Stat(nil)
if err != nil {
t.Fatal(err)
}
t.Log("count:", stat.Count, "size:", stat.Size, "valueSize:", stat.ValueSize)
}
func TestFileList_Count(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
t.Fatal(err)
}
before := time.Now()
count, err := list.Count()
if err != nil {
t.Fatal(err)
}
t.Log("count:", count)
t.Log(time.Since(before).Seconds()*1000, "ms")
}
func TestFileList_CleanAll(t *testing.T) {
list := NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
t.Fatal(err)
}
err = list.CleanAll()
if err != nil {
t.Fatal(err)
}
t.Log("ok")
t.Log(list.Count())
}

View File

@@ -0,0 +1,161 @@
package caches
import (
"strings"
"sync"
)
// MemoryList 内存缓存列表管理
type MemoryList struct {
m map[string]*Item // hash => item
locker sync.RWMutex
onAdd func(item *Item)
onRemove func(item *Item)
}
func NewMemoryList() ListInterface {
return &MemoryList{
m: map[string]*Item{},
}
}
func (this *MemoryList) Init() error {
// 内存列表不需要初始化
return nil
}
func (this *MemoryList) Reset() error {
this.locker.Lock()
this.m = map[string]*Item{}
this.locker.Unlock()
return nil
}
func (this *MemoryList) Add(hash string, item *Item) error {
this.locker.Lock()
if this.onAdd != nil {
this.onAdd(item)
}
this.m[hash] = item
this.locker.Unlock()
return nil
}
func (this *MemoryList) Exist(hash string) (bool, error) {
this.locker.RLock()
defer this.locker.RUnlock()
item, ok := this.m[hash]
if !ok {
return false, nil
}
return !item.IsExpired(), nil
}
// FindKeysWithPrefix 根据前缀进行查找
func (this *MemoryList) FindKeysWithPrefix(prefix string) (keys []string, err error) {
this.locker.RLock()
defer this.locker.RUnlock()
// TODO 需要优化性能支持千万级数据低于1s的处理速度
for _, item := range this.m {
if strings.HasPrefix(item.Key, prefix) {
keys = append(keys, item.Key)
}
}
return
}
func (this *MemoryList) Remove(hash string) error {
this.locker.Lock()
item, ok := this.m[hash]
if ok {
if this.onRemove != nil {
this.onRemove(item)
}
delete(this.m, hash)
}
this.locker.Unlock()
return nil
}
// Purge 清理过期的缓存
// count 每次遍历的最大数量,控制此数字可以保证每次清理的时候不用花太多时间
// callback 每次发现过期key的调用
func (this *MemoryList) Purge(count int, callback func(hash string) error) error {
this.locker.Lock()
deletedHashList := []string{}
for hash, item := range this.m {
if count <= 0 {
break
}
if item.IsExpired() {
if this.onRemove != nil {
this.onRemove(item)
}
delete(this.m, hash)
deletedHashList = append(deletedHashList, hash)
}
count--
}
this.locker.Unlock()
// 执行外部操作
for _, hash := range deletedHashList {
if callback != nil {
err := callback(hash)
if err != nil {
return err
}
}
}
return nil
}
func (this *MemoryList) CleanAll() error {
return this.Reset()
}
func (this *MemoryList) Stat(check func(hash string) bool) (*Stat, error) {
this.locker.RLock()
defer this.locker.RUnlock()
result := &Stat{
Count: 0,
Size: 0,
}
for hash, item := range this.m {
if !item.IsExpired() {
// 检查文件是否存在、内容是否正确等
if check != nil && check(hash) {
result.Count++
result.ValueSize += item.Size()
result.Size += item.TotalSize()
}
}
}
return result, nil
}
// Count 总数量
func (this *MemoryList) Count() (int64, error) {
this.locker.RLock()
count := int64(len(this.m))
this.locker.RUnlock()
return count, nil
}
// OnAdd 添加事件
func (this *MemoryList) OnAdd(f func(item *Item)) {
this.onAdd = f
}
// OnRemove 删除事件
func (this *MemoryList) OnRemove(f func(item *Item)) {
this.onRemove = f
}

View File

@@ -10,13 +10,13 @@ import (
)
func TestList_Add(t *testing.T) {
list := NewList()
list.Add("a", &Item{
list := &MemoryList{}
_ = list.Add("a", &Item{
Key: "a1",
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1024,
})
list.Add("b", &Item{
_ = list.Add("b", &Item{
Key: "b1",
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1024,
@@ -25,72 +25,73 @@ func TestList_Add(t *testing.T) {
}
func TestList_Remove(t *testing.T) {
list := NewList()
list.Add("a", &Item{
list := &MemoryList{}
_ = list.Add("a", &Item{
Key: "a1",
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1024,
})
list.Add("b", &Item{
_ = list.Add("b", &Item{
Key: "b1",
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1024,
})
list.Remove("b")
_ = list.Remove("b")
t.Log(list.m)
}
func TestList_Purge(t *testing.T) {
list := NewList()
list.Add("a", &Item{
list := &MemoryList{}
_ = list.Add("a", &Item{
Key: "a1",
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1024,
})
list.Add("b", &Item{
_ = list.Add("b", &Item{
Key: "b1",
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1024,
})
list.Add("c", &Item{
_ = list.Add("c", &Item{
Key: "c1",
ExpiredAt: time.Now().Unix() - 3600,
HeaderSize: 1024,
})
list.Add("d", &Item{
_ = list.Add("d", &Item{
Key: "d1",
ExpiredAt: time.Now().Unix() - 2,
HeaderSize: 1024,
})
list.Purge(100, func(hash string) {
_ = list.Purge(100, func(hash string) error {
t.Log("delete:", hash)
return nil
})
t.Log(list.m)
}
func TestList_Stat(t *testing.T) {
list := NewList()
list.Add("a", &Item{
list := &MemoryList{}
_ = list.Add("a", &Item{
Key: "a1",
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1024,
})
list.Add("b", &Item{
_ = list.Add("b", &Item{
Key: "b1",
ExpiredAt: time.Now().Unix() + 3600,
HeaderSize: 1024,
})
list.Add("c", &Item{
_ = list.Add("c", &Item{
Key: "c1",
ExpiredAt: time.Now().Unix(),
HeaderSize: 1024,
})
list.Add("d", &Item{
_ = list.Add("d", &Item{
Key: "d1",
ExpiredAt: time.Now().Unix() - 2,
HeaderSize: 1024,
})
result := list.Stat(func(hash string) bool {
result, _ := list.Stat(func(hash string) bool {
// 随机测试
rand.Seed(time.Now().UnixNano())
return rand.Int()%2 == 0
@@ -99,11 +100,11 @@ func TestList_Stat(t *testing.T) {
}
func TestList_FindKeysWithPrefix(t *testing.T) {
list := NewList()
list := &MemoryList{}
before := time.Now()
for i := 0; i < 1_000_000; i++ {
key := "http://www.teaos.cn/hello" + strconv.Itoa(i/100000) + "/" + strconv.Itoa(i) + ".html"
list.Add(fmt.Sprintf("%d", xxhash.Sum64String(key)), &Item{
_ = list.Add(fmt.Sprintf("%d", xxhash.Sum64String(key)), &Item{
Key: key,
ExpiredAt: 0,
BodySize: 0,
@@ -113,7 +114,10 @@ func TestList_FindKeysWithPrefix(t *testing.T) {
t.Log(time.Since(before).Seconds()*1000, "ms")
before = time.Now()
keys := list.FindKeysWithPrefix("http://www.teaos.cn/hello/5000")
keys, err := list.FindKeysWithPrefix("http://www.teaos.cn/hello/5000")
if err != nil {
t.Fatal(err)
}
t.Log(len(keys))
t.Log(time.Since(before).Seconds()*1000, "ms")
}

View File

@@ -3,27 +3,30 @@ package caches
type ReaderFunc func(n int) (goNext bool, err error)
type Reader interface {
// 初始化
// Init 初始化
Init() error
// 状态码
// TypeName 类型名称
TypeName() string
// Status 状态码
Status() int
// 读取Header
// ReadHeader 读取Header
ReadHeader(buf []byte, callback ReaderFunc) error
// 读取Body
// ReadBody 读取Body
ReadBody(buf []byte, callback ReaderFunc) error
// 读取某个范围内的Body
// ReadBodyRange 读取某个范围内的Body
ReadBodyRange(buf []byte, start int64, end int64, callback ReaderFunc) error
// Header Size
// HeaderSize Header Size
HeaderSize() int64
// Body Size
// BodySize Body Size
BodySize() int64
// 关闭
// Close 关闭
Close() error
}

View File

@@ -106,6 +106,10 @@ func (this *FileReader) Init() error {
return nil
}
func (this *FileReader) TypeName() string {
return "disk"
}
func (this *FileReader) Status() int {
return this.status
}

View File

@@ -16,6 +16,10 @@ func (this *MemoryReader) Init() error {
return nil
}
func (this *MemoryReader) TypeName() string {
return "memory"
}
func (this *MemoryReader) Status() int {
return this.item.Status
}

View File

@@ -12,6 +12,8 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/Tea"
stringutil "github.com/iwind/TeaGo/utils/string"
"golang.org/x/text/language"
"golang.org/x/text/message"
"io"
"os"
"path/filepath"
@@ -49,7 +51,7 @@ type FileStorage struct {
memoryStorage *MemoryStorage // 一级缓存
totalSize int64
list *List
list ListInterface
locker sync.RWMutex
ticker *utils.Ticker
}
@@ -57,7 +59,6 @@ type FileStorage struct {
func NewFileStorage(policy *serverconfigs.HTTPCachePolicy) *FileStorage {
return &FileStorage{
policy: policy,
list: NewList(),
}
}
@@ -68,41 +69,10 @@ func (this *FileStorage) Policy() *serverconfigs.HTTPCachePolicy {
// Init 初始化
func (this *FileStorage) Init() error {
this.list.OnAdd(func(item *Item) {
atomic.AddInt64(&this.totalSize, item.TotalSize())
})
this.list.OnRemove(func(item *Item) {
atomic.AddInt64(&this.totalSize, -item.TotalSize())
})
this.locker.Lock()
defer this.locker.Unlock()
before := time.Now()
cacheDir := ""
defer func() {
// 统计
count := 0
size := int64(0)
if this.list != nil {
stat := this.list.Stat(func(hash string) bool {
return true
})
count = stat.Count
size = stat.Size
}
cost := time.Since(before).Seconds() * 1000
sizeMB := strconv.FormatInt(size, 10) + " Bytes"
if size > 1024*1024*1024 {
sizeMB = fmt.Sprintf("%.3f G", float64(size)/1024/1024/1024)
} else if size > 1024*1024 {
sizeMB = fmt.Sprintf("%.3f M", float64(size)/1024/1024)
} else if size > 1024 {
sizeMB = fmt.Sprintf("%.3f K", float64(size)/1024)
}
remotelogs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+" from '"+cacheDir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+strconv.Itoa(count)+", size: "+sizeMB)
}()
// 配置
cacheConfig := &serverconfigs.HTTPFileCacheStorage{}
@@ -115,7 +85,7 @@ func (this *FileStorage) Init() error {
return err
}
this.cacheConfig = cacheConfig
cacheDir = cacheConfig.Dir
cacheDir := cacheConfig.Dir
if !filepath.IsAbs(this.cacheConfig.Dir) {
this.cacheConfig.Dir = Tea.Root + Tea.DS + this.cacheConfig.Dir
@@ -127,6 +97,26 @@ func (this *FileStorage) Init() error {
return errors.New("[CACHE]cache storage dir can not be empty")
}
list := NewFileList(dir + "/p" + strconv.FormatInt(this.policy.Id, 10))
err = list.Init()
if err != nil {
return err
}
this.list = list
stat, err := list.Stat(func(hash string) bool {
return true
})
if err != nil {
return err
}
this.totalSize = stat.Size
this.list.OnAdd(func(item *Item) {
atomic.AddInt64(&this.totalSize, item.TotalSize())
})
this.list.OnRemove(func(item *Item) {
atomic.AddInt64(&this.totalSize, -item.TotalSize())
})
// 检查目录是否存在
_, err = os.Stat(dir)
if err != nil {
@@ -140,6 +130,23 @@ func (this *FileStorage) Init() error {
}
}
defer func() {
// 统计
count := stat.Count
size := stat.Size
cost := time.Since(before).Seconds() * 1000
sizeMB := strconv.FormatInt(size, 10) + " Bytes"
if size > 1024*1024*1024 {
sizeMB = fmt.Sprintf("%.3f G", float64(size)/1024/1024/1024)
} else if size > 1024*1024 {
sizeMB = fmt.Sprintf("%.3f M", float64(size)/1024/1024)
} else if size > 1024 {
sizeMB = fmt.Sprintf("%.3f K", float64(size)/1024)
}
remotelogs.Println("CACHE", "init policy "+strconv.FormatInt(this.policy.Id, 10)+" from '"+cacheDir+"', cost: "+fmt.Sprintf("%.2f", cost)+" ms, count: "+message.NewPrinter(language.English).Sprintf("%d", count)+", size: "+sizeMB)
}()
// 初始化list
err = this.initList()
if err != nil {
@@ -188,10 +195,7 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) {
}
}
hash, path := this.keyPath(key)
if !this.list.Exist(hash) {
return nil, ErrNotFound
}
_, path := this.keyPath(key)
// TODO 尝试使用mmap加快读取速度
fp, err := os.OpenFile(path, os.O_RDONLY, 0444)
@@ -224,17 +228,21 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr
}
// 检查是否超出最大值
if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys {
count, err := this.list.Count()
if err != nil {
return nil, err
}
if this.policy.MaxKeys > 0 && count > this.policy.MaxKeys {
return nil, errors.New("write file cache failed: too many keys in cache storage")
}
capacityBytes := this.diskCapacityBytes()
if capacityBytes > 0 && capacityBytes <= this.totalSize {
return nil, errors.New("write file cache failed: over disk size, real size: " + strconv.FormatInt(this.totalSize, 10) + " bytes")
return nil, errors.New("write file cache failed: over disk size, current total size: " + strconv.FormatInt(this.totalSize, 10) + " bytes, capacity: " + strconv.FormatInt(capacityBytes, 10))
}
hash := stringutil.Md5(key)
dir := this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/" + hash[:2] + "/" + hash[2:4]
_, err := os.Stat(dir)
_, err = os.Stat(dir)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
@@ -246,7 +254,10 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr
}
// 先删除
this.list.Remove(hash)
err = this.list.Remove(hash)
if err != nil {
return nil, err
}
path := dir + "/" + hash + ".cache.tmp"
writer, err := os.OpenFile(path, os.O_CREATE|os.O_SYNC|os.O_WRONLY, 0666)
@@ -351,7 +362,10 @@ func (this *FileStorage) AddToList(item *Item) {
item.MetaSize = SizeMeta
hash := stringutil.Md5(item.Key)
this.list.Add(hash, item)
err := this.list.Add(hash, item)
if err != nil && !strings.Contains(err.Error(), "UNIQUE constraint failed") {
remotelogs.Error("CACHE", "add to list failed: "+err.Error())
}
}
// Delete 删除某个键值对应的缓存
@@ -365,8 +379,11 @@ func (this *FileStorage) Delete(key string) error {
}
hash, path := this.keyPath(key)
this.list.Remove(hash)
err := os.Remove(path)
err := this.list.Remove(hash)
if err != nil {
return err
}
err = os.Remove(path)
if err == nil || os.IsNotExist(err) {
return nil
}
@@ -380,7 +397,7 @@ func (this *FileStorage) Stat() (*Stat, error) {
return this.list.Stat(func(hash string) bool {
return true
}), nil
})
}
// CleanAll 清除所有的缓存
@@ -393,7 +410,10 @@ func (this *FileStorage) CleanAll() error {
_ = this.memoryStorage.CleanAll()
}
this.list.Reset()
err := this.list.CleanAll()
if err != nil {
return err
}
// 删除缓存和目录
// 不能直接删除子目录,比较危险
@@ -455,7 +475,11 @@ func (this *FileStorage) Purge(keys []string, urlType string) error {
if urlType == "dir" {
resultKeys := []string{}
for _, key := range keys {
resultKeys = append(resultKeys, this.list.FindKeysWithPrefix(key)...)
subKeys, err := this.list.FindKeysWithPrefix(key)
if err != nil {
return err
}
resultKeys = append(resultKeys, subKeys...)
}
keys = resultKeys
}
@@ -463,7 +487,11 @@ func (this *FileStorage) Purge(keys []string, urlType string) error {
// 文件
for _, key := range keys {
hash, path := this.keyPath(key)
if !this.list.Exist(hash) {
exists, err := this.list.Exist(hash)
if err != nil {
return err
}
if !exists {
err := os.Remove(path)
if err != nil && !os.IsNotExist(err) {
return err
@@ -471,11 +499,14 @@ func (this *FileStorage) Purge(keys []string, urlType string) error {
continue
}
err := os.Remove(path)
err = os.Remove(path)
if err != nil && !os.IsNotExist(err) {
return err
}
this.list.Remove(hash)
err = this.list.Remove(hash)
if err != nil {
return err
}
}
return nil
}
@@ -490,7 +521,7 @@ func (this *FileStorage) Stop() {
this.memoryStorage.Stop()
}
this.list.Reset()
_ = this.list.Reset()
if this.ticker != nil {
this.ticker.Stop()
}
@@ -534,45 +565,23 @@ func (this *FileStorage) hashPath(hash string) (path string) {
// 初始化List
func (this *FileStorage) initList() error {
this.list.Reset()
dir := this.dir()
// 清除tmp
files, err := filepath.Glob(dir + "/*/*/*.cache.tmp")
err := this.list.Reset()
if err != nil {
return err
}
for _, path := range files {
_ = os.Remove(path)
}
// 加载缓存
files, err = filepath.Glob(dir + "/*/*/*.cache")
if err != nil {
return err
}
for _, path := range files {
basename := filepath.Base(path)
index := strings.LastIndex(basename, ".")
if index < 0 {
continue
}
hash := basename[:index]
// 使用异步防止阻塞主线程
go func() {
dir := this.dir()
// 解析文件信息
item, err := this.decodeFile(path)
if err != nil {
if err != ErrNotFound {
remotelogs.Error("CACHE", "decode path '"+path+"': "+err.Error())
// 清除tmp
files, err := filepath.Glob(dir + "/*/*/*.cache.tmp")
if err == nil && len(files) > 0 {
for _, path := range files {
_ = os.Remove(path)
}
continue
}
if item == nil {
continue
}
this.list.Add(hash, item)
}
}()
// 启动定时清理任务
this.ticker = utils.NewTicker(30 * time.Second)
@@ -686,12 +695,13 @@ func (this *FileStorage) decodeFile(path string) (*Item, error) {
// 清理任务
func (this *FileStorage) purgeLoop() {
this.list.Purge(1000, func(hash string) {
_ = this.list.Purge(1000, func(hash string) error {
path := this.hashPath(hash)
err := os.Remove(path)
if err != nil && !os.IsNotExist(err) {
remotelogs.Error("CACHE", "purge '"+path+"' error: "+err.Error())
}
return nil
})
}

View File

@@ -40,7 +40,7 @@ func TestFileStorage_Init(t *testing.T) {
time.Sleep(2 * time.Second)
storage.purgeLoop()
t.Log(len(storage.list.m), "entries left")
t.Log(storage.list.(*FileList).total, "entries left")
}
func TestFileStorage_OpenWriter(t *testing.T) {
@@ -441,14 +441,16 @@ func TestFileStorage_CleanAll(t *testing.T) {
t.Log(time.Since(before).Seconds()*1000, "ms")
}()
t.Log("before:", storage.list.m)
c, _ := storage.list.Count()
t.Log("before:", c)
err = storage.CleanAll()
if err != nil {
t.Fatal(err)
}
t.Log("after:", storage.list.m)
c, _ = storage.list.Count()
t.Log("after:", c)
t.Log("ok")
}

View File

@@ -22,7 +22,7 @@ type MemoryItem struct {
type MemoryStorage struct {
policy *serverconfigs.HTTPCachePolicy
list *List
list ListInterface
locker *sync.RWMutex
valuesMap map[uint64]*MemoryItem
ticker *utils.Ticker
@@ -33,7 +33,7 @@ type MemoryStorage struct {
func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy) *MemoryStorage {
return &MemoryStorage{
policy: policy,
list: NewList(),
list: NewMemoryList(),
locker: &sync.RWMutex{},
valuesMap: map[uint64]*MemoryItem{},
}
@@ -42,10 +42,10 @@ func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy) *MemoryStorage {
// Init 初始化
func (this *MemoryStorage) Init() error {
this.list.OnAdd(func(item *Item) {
atomic.AddInt64(&this.totalSize, item.Size())
atomic.AddInt64(&this.totalSize, item.TotalSize())
})
this.list.OnRemove(func(item *Item) {
atomic.AddInt64(&this.totalSize, -item.Size())
atomic.AddInt64(&this.totalSize, -item.TotalSize())
})
if this.purgeDuration <= 0 {
@@ -92,7 +92,11 @@ func (this *MemoryStorage) OpenReader(key string) (Reader, error) {
// OpenWriter 打开缓存写入器等待写入
func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) {
// 检查是否超出最大值
if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys {
totalKeys, err := this.list.Count()
if err != nil {
return nil, err
}
if this.policy.MaxKeys > 0 && totalKeys > this.policy.MaxKeys {
return nil, errors.New("write memory cache failed: too many keys in cache storage")
}
capacityBytes := this.memoryCapacityBytes()
@@ -101,7 +105,7 @@ func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (
}
// 先删除
err := this.Delete(key)
err = this.Delete(key)
if err != nil {
return nil, err
}
@@ -114,7 +118,7 @@ func (this *MemoryStorage) Delete(key string) error {
hash := this.hash(key)
this.locker.Lock()
delete(this.valuesMap, hash)
this.list.Remove(fmt.Sprintf("%d", hash))
_ = this.list.Remove(fmt.Sprintf("%d", hash))
this.locker.Unlock()
return nil
}
@@ -126,14 +130,14 @@ func (this *MemoryStorage) Stat() (*Stat, error) {
return this.list.Stat(func(hash string) bool {
return true
}), nil
})
}
// CleanAll 清除所有缓存
func (this *MemoryStorage) CleanAll() error {
this.locker.Lock()
this.valuesMap = map[uint64]*MemoryItem{}
this.list.Reset()
_ = this.list.Reset()
atomic.StoreInt64(&this.totalSize, 0)
this.locker.Unlock()
return nil
@@ -145,7 +149,11 @@ func (this *MemoryStorage) Purge(keys []string, urlType string) error {
if urlType == "dir" {
resultKeys := []string{}
for _, key := range keys {
resultKeys = append(resultKeys, this.list.FindKeysWithPrefix(key)...)
subKeys, err := this.list.FindKeysWithPrefix(key)
if err != nil {
return err
}
resultKeys = append(resultKeys, subKeys...)
}
keys = resultKeys
}
@@ -165,7 +173,7 @@ func (this *MemoryStorage) Stop() {
defer this.locker.Unlock()
this.valuesMap = map[uint64]*MemoryItem{}
this.list.Reset()
_ = this.list.Reset()
if this.ticker != nil {
this.ticker.Stop()
}
@@ -180,7 +188,7 @@ func (this *MemoryStorage) Policy() *serverconfigs.HTTPCachePolicy {
func (this *MemoryStorage) AddToList(item *Item) {
item.MetaSize = int64(len(item.Key)) + 32 /** 32是我们评估的数据结构的长度 **/
hash := fmt.Sprintf("%d", this.hash(item.Key))
this.list.Add(hash, item)
_ = this.list.Add(hash, item)
}
// TotalDiskSize 消耗的磁盘尺寸
@@ -200,13 +208,14 @@ func (this *MemoryStorage) hash(key string) uint64 {
// 清理任务
func (this *MemoryStorage) purgeLoop() {
this.list.Purge(2048, func(hash string) {
_ = this.list.Purge(2048, func(hash string) error {
uintHash, err := strconv.ParseUint(hash, 10, 64)
if err == nil {
this.locker.Lock()
delete(this.valuesMap, uintHash)
this.locker.Unlock()
}
return nil
})
}

View File

@@ -174,7 +174,8 @@ func TestMemoryStorage_CleanAll(t *testing.T) {
if err != nil {
t.Fatal(err)
}
t.Log(storage.list.Count(), len(storage.valuesMap))
total, _ := storage.list.Count()
t.Log(total, len(storage.valuesMap))
}
func TestMemoryStorage_Purge(t *testing.T) {
@@ -208,7 +209,8 @@ func TestMemoryStorage_Purge(t *testing.T) {
if err != nil {
t.Fatal(err)
}
t.Log(storage.list.Count(), len(storage.valuesMap))
total, _ := storage.list.Count()
t.Log(total, len(storage.valuesMap))
}
func TestMemoryStorage_Expire(t *testing.T) {

View File

@@ -1,7 +1,7 @@
package teaconst
const (
Version = "0.1.0"
Version = "0.1.1"
ProductName = "Edge Node"
ProcessName = "edge-node"

View File

@@ -47,7 +47,7 @@ func (this *IPListManager) Start() {
// 第一次读取
err := this.loop()
if err != nil {
remotelogs.Println("IP_LIST_MANAGER", err.Error())
remotelogs.Error("IP_LIST_MANAGER", err.Error())
}
ticker := time.NewTicker(60 * time.Second)
@@ -64,7 +64,7 @@ func (this *IPListManager) Start() {
if err != nil {
countErrors++
remotelogs.Println("IP_LIST_MANAGER", err.Error())
remotelogs.Error("IP_LIST_MANAGER", err.Error())
// 连续错误小于3次的我们立即重试
if countErrors <= 3 {

View File

@@ -1,6 +1,7 @@
package nodes
import (
"context"
"errors"
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/configutils"
@@ -9,6 +10,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/stats"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/types"
"golang.org/x/net/http2"
"net"
"net/http"
"net/url"
@@ -1133,3 +1135,24 @@ func (this *HTTPRequest) bytePool(contentLength int64) *utils.BytePool {
}
return bytePool128k
}
// 检查是否可以忽略错误
func (this *HTTPRequest) canIgnore(err error) bool {
if err == nil {
return true
}
// 客户端主动取消
if err == context.Canceled {
return true
}
// HTTP/2流错误
{
_, ok := err.(http2.StreamError)
if ok {
return true
}
}
return false
}

View File

@@ -12,7 +12,12 @@ import (
// 读取缓存
func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
if this.web.Cache == nil || !this.web.Cache.IsOn || len(this.web.Cache.CacheRefs) == 0 {
cachePolicy := sharedNodeConfig.HTTPCachePolicy
if cachePolicy == nil || !cachePolicy.IsOn {
return
}
if this.web.Cache == nil || !this.web.Cache.IsOn || (len(cachePolicy.CacheRefs) == 0 && len(this.web.Cache.CacheRefs) == 0) {
return
}
var addStatusHeader = this.web.Cache.AddStatusHeader
@@ -25,12 +30,8 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
}()
}
cachePolicy := sharedNodeConfig.HTTPCachePolicy
if cachePolicy == nil || !cachePolicy.IsOn {
return
}
// 检查条件
// 检查服务独立的缓存条件
refType := ""
for _, cacheRef := range this.web.Cache.CacheRefs {
if !cacheRef.IsOn ||
cacheRef.Conds == nil ||
@@ -39,11 +40,28 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
}
if cacheRef.Conds.MatchRequest(this.Format) {
this.cacheRef = cacheRef
refType = "server"
break
}
}
if this.cacheRef == nil {
return
// 检查策略默认的缓存条件
for _, cacheRef := range cachePolicy.CacheRefs {
if !cacheRef.IsOn ||
cacheRef.Conds == nil ||
!cacheRef.Conds.HasRequestConds() {
continue
}
if cacheRef.Conds.MatchRequest(this.Format) {
this.cacheRef = cacheRef
refType = "policy"
break
}
}
if this.cacheRef == nil {
return
}
}
// 相关变量
@@ -89,7 +107,9 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
return
}
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
if !this.canIgnore(err) {
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
}
return
}
defer func() {
@@ -121,12 +141,14 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
return true, nil
})
if err != nil {
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
if !this.canIgnore(err) {
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
}
return
}
if addStatusHeader {
this.writer.Header().Set("X-Cache", "HIT")
this.writer.Header().Set("X-Cache", "HIT, "+refType+", "+reader.TypeName())
}
this.processResponseHeaders(reader.Status())
@@ -211,7 +233,9 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
this.writer.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
return true
}
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
if !this.canIgnore(err) {
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
}
return
}
} else if len(rangeSet) > 1 {
@@ -252,7 +276,9 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
return true, err
})
if err != nil {
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
if !this.canIgnore(err) {
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
}
return true
}
}
@@ -273,7 +299,9 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
return true, nil
})
if err != nil {
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
if !this.canIgnore(err) {
remotelogs.Error("REQUEST_CACHE", "read from cache failed: "+err.Error())
}
return
}
}

View File

@@ -189,7 +189,7 @@ func (this *HTTPRequest) doFastcgi() (shouldStop bool) {
this.processResponseHeaders(resp.StatusCode)
// 准备
this.writer.Prepare(resp.ContentLength)
this.writer.Prepare(resp.ContentLength, resp.StatusCode)
// 设置响应代码
this.writer.WriteHeader(resp.StatusCode)
@@ -202,11 +202,11 @@ func (this *HTTPRequest) doFastcgi() (shouldStop bool) {
err1 := resp.Body.Close()
if err1 != nil {
remotelogs.Error("REQUEST_REVERSE_PROXY", err1.Error())
remotelogs.Warn("REQUEST_FASTCGI", err1.Error())
}
if err != nil && err != io.EOF {
remotelogs.Error("REQUEST_REVERSE_PROXY", err.Error())
remotelogs.Warn("REQUEST_FASTCGI", err.Error())
this.addError(err)
}
return

View File

@@ -2,6 +2,7 @@ package nodes
import (
"net/http"
"strconv"
"strings"
)
@@ -12,7 +13,7 @@ func (this *HTTPRequest) doHostRedirect() (blocked bool) {
if !u.IsOn {
continue
}
if u.MatchPrefix {
if u.MatchPrefix { // 匹配前缀
if strings.HasPrefix(fullURL, u.BeforeURL) {
afterURL := u.AfterURL
if u.KeepRequestURI {
@@ -25,7 +26,39 @@ func (this *HTTPRequest) doHostRedirect() (blocked bool) {
}
return true
}
} else {
} else if u.MatchRegexp { // 正则匹配
reg := u.BeforeURLRegexp()
if reg == nil {
continue
}
matches := reg.FindStringSubmatch(fullURL)
if len(matches) == 0 {
continue
}
afterURL := u.AfterURL
for i, match := range matches {
afterURL = strings.ReplaceAll(afterURL, "${"+strconv.Itoa(i)+"}", match)
}
subNames := reg.SubexpNames()
if len(subNames) > 0 {
for _, subName := range subNames {
if len(subName) > 0 {
index := reg.SubexpIndex(subName)
if index > -1 {
afterURL = strings.ReplaceAll(afterURL, "${"+subName+"}", matches[index])
}
}
}
}
if u.Status <= 0 {
http.Redirect(this.RawWriter, this.RawReq, afterURL, http.StatusTemporaryRedirect)
} else {
http.Redirect(this.RawWriter, this.RawReq, afterURL, u.Status)
}
return true
} else { // 精准匹配
if fullURL == u.RealBeforeURL() {
if u.Status <= 0 {
http.Redirect(this.RawWriter, this.RawReq, u.AfterURL, http.StatusTemporaryRedirect)

View File

@@ -19,11 +19,10 @@ func (this *HTTPRequest) doRedirectToHTTPS(redirectToHTTPSConfig *serverconfigs.
} else if redirectToHTTPSConfig.Port > 0 {
lastIndex := strings.LastIndex(host, ":")
if lastIndex > 0 {
if redirectToHTTPSConfig.Port != 443 {
host = host[:lastIndex] + ":" + strconv.Itoa(redirectToHTTPSConfig.Port)
} else {
host = host[:lastIndex]
}
host = host[:lastIndex]
}
if redirectToHTTPSConfig.Port != 443 {
host = host + ":" + strconv.Itoa(redirectToHTTPSConfig.Port)
}
} else {
lastIndex := strings.LastIndex(host, ":")

View File

@@ -189,7 +189,7 @@ func (this *HTTPRequest) doReverseProxy() {
if this.doWAFResponse(resp) {
err = resp.Body.Close()
if err != nil {
remotelogs.Error("REQUEST_REVERSE_PROXY", err.Error())
remotelogs.Warn("REQUEST_REVERSE_PROXY", err.Error())
}
return
}
@@ -201,7 +201,7 @@ func (this *HTTPRequest) doReverseProxy() {
if len(this.web.Pages) > 0 && this.doPage(resp.StatusCode) {
err = resp.Body.Close()
if err != nil {
remotelogs.Error("REQUEST_REVERSE_PROXY", err.Error())
remotelogs.Warn("REQUEST_REVERSE_PROXY", err.Error())
}
return
}
@@ -226,7 +226,7 @@ func (this *HTTPRequest) doReverseProxy() {
shouldAutoFlush := this.reverseProxy.AutoFlush || this.RawReq.Header.Get("Accept") == "text/event-stream"
// 准备
this.writer.Prepare(resp.ContentLength)
this.writer.Prepare(resp.ContentLength, resp.StatusCode)
// 设置响应代码
this.writer.WriteHeader(resp.StatusCode)
@@ -256,11 +256,15 @@ func (this *HTTPRequest) doReverseProxy() {
err1 := resp.Body.Close()
if err1 != nil {
remotelogs.Error("REQUEST_REVERSE_PROXY", err1.Error())
if !this.canIgnore(err) {
remotelogs.Warn("REQUEST_REVERSE_PROXY", err1.Error())
}
}
if err != nil && err != io.EOF {
remotelogs.Error("REQUEST_REVERSE_PROXY", err.Error())
this.addError(err)
if !this.canIgnore(err) {
remotelogs.Warn("REQUEST_REVERSE_PROXY", err.Error())
this.addError(err)
}
}
}

View File

@@ -296,7 +296,7 @@ func (this *HTTPRequest) doRoot() (isBreak bool) {
this.cacheRef = nil // 不支持缓存
}
this.writer.Prepare(fileSize)
this.writer.Prepare(fileSize, http.StatusOK)
pool := this.bytePool(fileSize)
buf := pool.Get()

View File

@@ -50,7 +50,11 @@ func (this *HTTPRequest) doURL(method string, url string, host string, statusCod
}
this.writer.AddHeaders(resp.Header)
this.writer.Prepare(resp.ContentLength)
if statusCode <= 0 {
this.writer.Prepare(resp.ContentLength, resp.StatusCode)
} else {
this.writer.Prepare(resp.ContentLength, statusCode)
}
// 设置响应代码
if statusCode <= 0 {

View File

@@ -64,7 +64,9 @@ func (this *HTTPWriter) Gzip(config *serverconfigs.HTTPGzipConfig) {
}
// 准备输出
func (this *HTTPWriter) Prepare(size int64) {
func (this *HTTPWriter) Prepare(size int64, status int) {
this.statusCode = status
this.prepareGzip(size)
this.prepareCache(size)
}

View File

@@ -12,21 +12,21 @@ import (
var sharedListenerManager = NewListenerManager()
// 端口监听管理器
// ListenerManager 端口监听管理器
type ListenerManager struct {
listenersMap map[string]*Listener // addr => *Listener
locker sync.Mutex
lastConfig *nodeconfigs.NodeConfig
}
// 获取新对象
// NewListenerManager 获取新对象
func NewListenerManager() *ListenerManager {
return &ListenerManager{
listenersMap: map[string]*Listener{},
}
}
// 启动监听
// Start 启动监听
func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error {
this.locker.Lock()
defer this.locker.Unlock()
@@ -83,7 +83,13 @@ func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error {
listener.Reload(group)
err := listener.Listen()
if err != nil {
remotelogs.Error("LISTENER_MANAGER", err.Error())
firstServer := group.FirstServer()
if firstServer == nil {
remotelogs.Error("LISTENER_MANAGER", err.Error())
} else {
remotelogs.ServerError(firstServer.Id, "LISTENER_MANAGER", err.Error())
}
continue
}
this.listenersMap[addr] = listener
@@ -93,7 +99,7 @@ func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error {
return nil
}
// 获取总的活跃连接数
// TotalActiveConnections 获取总的活跃连接数
func (this *ListenerManager) TotalActiveConnections() int {
this.locker.Lock()
defer this.locker.Unlock()

View File

@@ -24,7 +24,7 @@ func init() {
}()
}
// 打印普通信息
// Println 打印普通信息
func Println(tag string, description string) {
logs.Println("[" + tag + "]" + description)
@@ -47,7 +47,7 @@ func Println(tag string, description string) {
}
}
// 打印警告信息
// Warn 打印警告信息
func Warn(tag string, description string) {
logs.Println("[" + tag + "]" + description)
@@ -70,7 +70,7 @@ func Warn(tag string, description string) {
}
}
// 打印错误信息
// Error 打印错误信息
func Error(tag string, description string) {
logs.Println("[" + tag + "]" + description)
@@ -93,6 +93,30 @@ func Error(tag string, description string) {
}
}
// ServerError 打印错误信息
func ServerError(serverId int64, tag string, description string) {
logs.Println("[" + tag + "]" + description)
nodeConfig, _ := nodeconfigs.SharedNodeConfig()
if nodeConfig == nil {
return
}
select {
case logChan <- &pb.NodeLog{
Role: teaconst.Role,
Tag: tag,
Description: description,
Level: "error",
NodeId: nodeConfig.Id,
ServerId: serverId,
CreatedAt: time.Now().Unix(),
}:
default:
}
}
// 上传日志
func uploadLogs() error {
logList := []*pb.NodeLog{}