Compare commits

...

5 Commits

Author SHA1 Message Date
刘祥超
1a1280b76e 优化代码 2022-08-19 14:50:26 +08:00
刘祥超
85d3b1169f 修复Gif中透明图转换WebP失败的问题 2022-08-19 13:39:46 +08:00
刘祥超
1b32292e0c WebP压缩Gif时,自动跳过转换失败的帧,并只提示最后一次错误 2022-08-19 13:27:18 +08:00
刘祥超
e6ba44fb2f [SQLITE]使用事务批量提交一些缓存相关任务 2022-08-17 21:04:00 +08:00
刘祥超
9b6e022f46 将版本修改为0.5.2 2022-08-17 18:56:48 +08:00
9 changed files with 250 additions and 30 deletions

2
go.mod
View File

@@ -20,7 +20,7 @@ require (
github.com/iwind/TeaGo v0.0.0-20220807030847-31de8e1cbe55
github.com/iwind/gofcgi v0.0.0-20210528023741-a92711d45f11
github.com/iwind/gosock v0.0.0-20211103081026-ee4652210ca4
github.com/iwind/gowebp v0.0.0-20220808073410-ac25a4258cf3
github.com/iwind/gowebp v0.0.0-20220819053541-c235395277b5
github.com/klauspost/compress v1.15.8
github.com/mattn/go-sqlite3 v1.14.9
github.com/mdlayher/netlink v1.4.2

4
go.sum
View File

@@ -88,8 +88,8 @@ github.com/iwind/gofcgi v0.0.0-20210528023741-a92711d45f11 h1:DaQjoWZhLNxjhIXedV
github.com/iwind/gofcgi v0.0.0-20210528023741-a92711d45f11/go.mod h1:JtbX20untAjUVjZs1ZBtq80f5rJWvwtQNRL6EnuYRnY=
github.com/iwind/gosock v0.0.0-20211103081026-ee4652210ca4 h1:VWGsCqTzObdlbf7UUE3oceIpcEKi4C/YBUszQXk118A=
github.com/iwind/gosock v0.0.0-20211103081026-ee4652210ca4/go.mod h1:H5Q7SXwbx3a97ecJkaS2sD77gspzE7HFUafBO0peEyA=
github.com/iwind/gowebp v0.0.0-20220808073410-ac25a4258cf3 h1:2azv4MO3Y4/1bxlx4eLMp/h8Mmq4banSjvgyBpc53Kc=
github.com/iwind/gowebp v0.0.0-20220808073410-ac25a4258cf3/go.mod h1:Re7TEhwL+ygnxFg52fC0PWy01ULAIZp2QR0q5WwEOQA=
github.com/iwind/gowebp v0.0.0-20220819053541-c235395277b5 h1:SgKhrqRhWVpu0ZKxQM8MGjdhy7hlH3Xax8snwsZWSic=
github.com/iwind/gowebp v0.0.0-20220819053541-c235395277b5/go.mod h1:Re7TEhwL+ygnxFg52fC0PWy01ULAIZp2QR0q5WwEOQA=
github.com/josharian/native v0.0.0-20200817173448-b6b71def0850 h1:uhL5Gw7BINiiPAo24A2sxkcDI0Jt/sqp1v5xQCniEFA=
github.com/josharian/native v0.0.0-20200817173448-b6b71def0850/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
github.com/jsimonetti/rtnetlink v0.0.0-20190606172950-9527aa82566a/go.mod h1:Oz+70psSo5OFh8DBl0Zv2ACw7Esh6pPUphlvZG9x7uw=

View File

@@ -102,7 +102,7 @@ func (this *FileList) Add(hash string, item *Item) error {
return nil
}
err := db.Add(hash, item)
err := db.AddAsync(hash, item)
if err != nil {
return err
}
@@ -266,7 +266,7 @@ func (this *FileList) Stat(check func(hash string) bool) (*Stat, error) {
// 这里不设置过期时间、不使用 check 函数,目的是让查询更快速一些
_ = check
row := db.statStmt.QueryRow()
var row = db.statStmt.QueryRow()
if row.Err() != nil {
return nil, row.Err()
}
@@ -357,7 +357,7 @@ func (this *FileList) remove(hash string) (notFound bool, err error) {
return false, err
}
_, err = db.deleteByHashStmt.Exec(hash)
err = db.DeleteAsync(hash)
if err != nil {
return false, db.WrapError(err)
}

View File

@@ -6,6 +6,8 @@ import (
"database/sql"
"errors"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/goman"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/TeaOSLab/EdgeNode/internal/utils/dbs"
"github.com/iwind/TeaGo/types"
@@ -21,6 +23,8 @@ type FileListDB struct {
readDB *dbs.DB
writeDB *dbs.DB
writeBatch *dbs.Batch
itemsTableName string
hitsTableName string
@@ -30,10 +34,16 @@ type FileListDB struct {
isReady bool
// cacheItems
existsByHashStmt *dbs.Stmt // 根据hash检查是否存在
insertStmt *dbs.Stmt // 写入数据
selectByHashStmt *dbs.Stmt // 使用hash查询数据
deleteByHashStmt *dbs.Stmt // 根据hash删除数据
existsByHashStmt *dbs.Stmt // 根据hash检查是否存在
insertStmt *dbs.Stmt // 写入数据
insertSQL string
selectByHashStmt *dbs.Stmt // 使用hash查询数据
deleteByHashStmt *dbs.Stmt // 根据hash删除数据
deleteByHashSQL string
statStmt *dbs.Stmt // 统计
purgeStmt *dbs.Stmt // 清理
deleteAllStmt *dbs.Stmt // 删除所有数据
@@ -69,8 +79,17 @@ func (this *FileListDB) Open(dbPath string) error {
}**/
this.writeDB = dbs.NewDB(writeDB)
this.writeBatch = dbs.NewBatch(writeDB, 4)
this.writeBatch.OnFail(func(err error) {
remotelogs.Warn("LIST_FILE_DB", "run batch failed: "+err.Error())
})
goman.New(func() {
this.writeBatch.Exec()
})
if teaconst.EnableDBStat {
this.writeBatch.EnableStat(true)
this.writeDB.EnableStat(true)
}
@@ -119,7 +138,8 @@ func (this *FileListDB) Init() error {
return err
}
this.insertStmt, err = this.writeDB.Prepare(`INSERT INTO "` + this.itemsTableName + `" ("hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "staleAt", "host", "serverId", "createdAt") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
this.insertSQL = `INSERT INTO "` + this.itemsTableName + `" ("hash", "key", "headerSize", "bodySize", "metaSize", "expiredAt", "staleAt", "host", "serverId", "createdAt") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
this.insertStmt, err = this.writeDB.Prepare(this.insertSQL)
if err != nil {
return err
}
@@ -129,7 +149,8 @@ func (this *FileListDB) Init() error {
return err
}
this.deleteByHashStmt, err = this.writeDB.Prepare(`DELETE FROM "` + this.itemsTableName + `" WHERE "hash"=?`)
this.deleteByHashSQL = `DELETE FROM "` + this.itemsTableName + `" WHERE "hash"=?`
this.deleteByHashStmt, err = this.writeDB.Prepare(this.deleteByHashSQL)
if err != nil {
return err
}
@@ -181,12 +202,21 @@ func (this *FileListDB) Total() int64 {
return this.total
}
func (this *FileListDB) Add(hash string, item *Item) error {
func (this *FileListDB) AddAsync(hash string, item *Item) error {
if item.StaleAt == 0 {
item.StaleAt = item.ExpiredAt
}
this.writeBatch.Add(this.insertSQL, hash, item.Key, item.HeaderSize, item.BodySize, item.MetaSize, item.ExpiredAt, item.StaleAt, item.Host, item.ServerId, utils.UnixTime())
return nil
}
func (this *FileListDB) AddSync(hash string, item *Item) error {
if item.StaleAt == 0 {
item.StaleAt = item.ExpiredAt
}
// 放入队列
_, err := this.insertStmt.Exec(hash, item.Key, item.HeaderSize, item.BodySize, item.MetaSize, item.ExpiredAt, item.StaleAt, item.Host, item.ServerId, utils.UnixTime())
if err != nil {
return this.WrapError(err)
@@ -195,6 +225,19 @@ func (this *FileListDB) Add(hash string, item *Item) error {
return nil
}
func (this *FileListDB) DeleteAsync(hash string) error {
this.writeBatch.Add(this.deleteByHashSQL, hash)
return nil
}
func (this *FileListDB) DeleteSync(hash string) error {
_, err := this.deleteByHashStmt.Exec(hash)
if err != nil {
return err
}
return nil
}
func (this *FileListDB) ListExpiredItems(count int) (hashList []string, err error) {
if !this.isReady {
return nil, nil
@@ -349,6 +392,11 @@ func (this *FileListDB) Close() error {
}
}
if this.writeBatch != nil {
this.writeBatch.Close()
}
if len(errStrings) == 0 {
return nil
}

View File

@@ -69,8 +69,8 @@ func TestFileList_Add_Many(t *testing.T) {
_ = list.Close()
}()
before := time.Now()
for i := 0; i < 100_000; i++ {
var before = time.Now()
for i := 0; i < 10_000_000; i++ {
u := "https://edge.teaos.cn/123456" + strconv.Itoa(i)
_ = list.Add(stringutil.Md5(u), &caches.Item{
Key: u,
@@ -290,12 +290,12 @@ func TestFileList_Stat(t *testing.T) {
}
func TestFileList_Count(t *testing.T) {
list := caches.NewFileList(Tea.Root + "/data")
var list = caches.NewFileList(Tea.Root + "/data")
err := list.Init()
if err != nil {
t.Fatal(err)
}
before := time.Now()
var before = time.Now()
count, err := list.Count()
if err != nil {
t.Fatal(err)
@@ -361,12 +361,26 @@ func TestFileList_UpgradeV3(t *testing.T) {
t.Log("ok")
}
func TestFileList_HashList(t *testing.T) {
var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1")
err := list.Init()
if err != nil {
t.Fatal(err)
}
prefixes, err := list.(*caches.FileList).FindAllPrefixes()
if err != nil {
t.Fatal(err)
}
t.Log(len(prefixes))
}
func BenchmarkFileList_Exist(b *testing.B) {
var list = caches.NewFileList(Tea.Root + "/data/cache-index/p1")
err := list.Init()
if err != nil {
b.Fatal(err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _ = list.Exist("f0eb5b87e0b0041f3917002c0707475f" + types.String(i))
}

View File

@@ -1,7 +1,7 @@
package teaconst
const (
Version = "0.5.1"
Version = "0.5.2"
ProductName = "Edge Node"
ProcessName = "edge-node"

View File

@@ -1007,30 +1007,35 @@ func (this *HTTPWriter) finishWebP() {
Exact: true,
})
} else if gifImage != nil {
anim := gowebp.NewWebpAnimation(gifImage.Config.Width, gifImage.Config.Height, gifImage.LoopCount)
var anim = gowebp.NewWebpAnimation(gifImage.Config.Width, gifImage.Config.Height, gifImage.LoopCount)
anim.WebPAnimEncoderOptions.SetKmin(9)
anim.WebPAnimEncoderOptions.SetKmax(17)
defer anim.ReleaseMemory()
webpConfig := gowebp.NewWebpConfig()
var webpConfig = gowebp.NewWebpConfig()
//webpConfig.SetLossless(1)
webpConfig.SetQuality(f)
var timeline = 0
var lastErr error
for i, img := range gifImage.Image {
err = anim.AddFrame(img, timeline, webpConfig)
if err != nil {
break
// 有错误直接跳过
lastErr = err
err = nil
}
timeline += gifImage.Delay[i] * 10
}
if err == nil {
err = anim.AddFrame(nil, timeline, webpConfig)
if err == nil {
err = anim.Encode(this.writer)
}
if lastErr != nil {
remotelogs.Error("HTTP_WRITER", "'"+this.req.URL()+"' encode webp failed: "+lastErr.Error())
}
err = anim.AddFrame(nil, timeline, webpConfig)
if err == nil {
err = anim.Encode(this.writer)
}
anim.ReleaseMemory()
}
if err != nil && !this.req.canIgnore(err) {

149
internal/utils/dbs/batch.go Normal file
View File

@@ -0,0 +1,149 @@
// Copyright 2022 Liuxiangchao iwind.liu@gmail.com. All rights reserved. Official site: https://goedge.cn .
package dbs
import (
"database/sql"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"time"
)
type batchItem struct {
query string
args []any
}
type Batch struct {
db *sql.DB
n int
enableStat bool
onFail func(err error)
queue chan *batchItem
close chan bool
isClosed bool
}
func NewBatch(db *sql.DB, n int) *Batch {
return &Batch{
db: db,
n: n,
queue: make(chan *batchItem),
close: make(chan bool, 1),
}
}
func (this *Batch) EnableStat(b bool) {
this.enableStat = b
}
func (this *Batch) OnFail(callback func(err error)) {
this.onFail = callback
}
func (this *Batch) Add(query string, args ...any) {
this.queue <- &batchItem{
query: query,
args: args,
}
}
func (this *Batch) Exec() {
var n = this.n
if n <= 0 {
n = 4
}
var ticker = time.NewTicker(100 * time.Millisecond)
var count = 0
var lastTx *sql.Tx
For:
for {
// closed
if this.isClosed {
return
}
select {
case item := <-this.queue:
if lastTx == nil {
lastTx = this.beginTx()
if lastTx == nil {
continue For
}
}
err := this.execItem(lastTx, item)
if err != nil {
this.processErr(item.query, err)
}
count++
if count == n {
count = 0
err = lastTx.Commit()
lastTx = nil
if err != nil {
this.processErr("commit", err)
}
}
case <-ticker.C:
if lastTx == nil || count == 0 {
continue For
}
count = 0
err := lastTx.Commit()
lastTx = nil
if err != nil {
this.processErr("commit", err)
}
case <-this.close:
// closed
return
}
}
}
func (this *Batch) Close() {
this.isClosed = true
select {
case this.close <- true:
default:
}
}
func (this *Batch) beginTx() *sql.Tx {
tx, err := this.db.Begin()
if err != nil {
this.processErr("begin transaction", err)
return nil
}
return tx
}
func (this *Batch) execItem(tx *sql.Tx, item *batchItem) error {
if this.enableStat {
defer SharedQueryStatManager.AddQuery(item.query).End()
}
_, err := tx.Exec(item.query, item.args...)
return err
}
func (this *Batch) processErr(prefix string, err error) {
if err == nil {
return
}
if this.onFail != nil {
this.onFail(err)
} else {
remotelogs.Error("SQLITE_BATCH", prefix+": "+err.Error())
}
}

View File

@@ -80,3 +80,7 @@ func (this *DB) Close() error {
events.Remove(fmt.Sprintf("db_%p", this))
return this.rawDB.Close()
}
func (this *DB) RawDB() *sql.DB {
return this.rawDB
}