Compare commits

...

37 Commits

Author SHA1 Message Date
刘祥超
fa6be81abe 特殊页面可以直接使用HTML 2021-10-10 10:35:05 +08:00
刘祥超
1b2f01f0f4 把tcp/udp的连接数记为访问量,增加tcp域名排名记录(需要SNI连接) 2021-10-08 15:50:43 +08:00
刘祥超
09467a4d08 WAF模式从pass改为bypass 2021-10-07 13:51:26 +08:00
刘祥超
a17bbc3df1 缓存预热判断请求来源的时候增加IPv6回路地址判断 2021-10-06 16:35:39 +08:00
刘祥超
dc495c70b3 服务支持自定义访客IP地址获取方式/对X-Real-IP等Header值进行有效性验证 2021-10-06 11:40:48 +08:00
刘祥超
d0cf145f85 优化WAF动作排序 2021-10-06 09:39:57 +08:00
刘祥超
95f1e61489 WAF动作block和record_ip同时存在时,优先执行record_ip 2021-10-06 08:56:38 +08:00
刘祥超
42a0161312 WAF模式为defend时IP黑名单才生效 2021-10-04 18:22:51 +08:00
刘祥超
729443f0b4 大幅优化IP名单查询速度 2021-10-04 17:42:38 +08:00
刘祥超
3888565c0f 根据系统内存自动调节ttlcache的最大条目 2021-10-04 09:12:17 +08:00
刘祥超
1ca967534a 开启缓存后覆盖源站的ETag和Last-Modified 2021-10-04 08:41:44 +08:00
刘祥超
c01bb57dea 不把499状态码加入状态码统计 2021-10-04 08:41:13 +08:00
刘祥超
adadb52d4e 优化WebP+缓存 2021-10-03 18:00:57 +08:00
刘祥超
38a7cc17da 修复WAF策略模式为空导致动作不起作用的问题 2021-10-03 08:35:28 +08:00
刘祥超
246bb45614 限制WebP转换时消耗的内存总量 2021-10-01 18:59:44 +08:00
刘祥超
b320d2dc58 修复WebP缓存长度可能不正确的问题 2021-10-01 17:20:37 +08:00
刘祥超
96c63300f4 恢复源站默认连接数限制 2021-10-01 16:45:46 +08:00
刘祥超
3eaf090aac 缓存内容也支持压缩 2021-10-01 16:24:29 +08:00
刘祥超
b157448ad2 支持自动转换图像文件为WebP 2021-10-01 16:24:17 +08:00
刘祥超
44998a23fb 反向代理去除默认跟源站的连接数限制 2021-10-01 11:17:43 +08:00
刘祥超
e3e30ffee5 节点启动时如果加载的是本地配置则在网络恢复后重新加载配置 2021-10-01 11:13:36 +08:00
刘祥超
12bddc6e82 WAF策略增加观察模式和通过模式 2021-09-30 11:30:58 +08:00
刘祥超
771d2d8013 支持brotli和deflate压缩 2021-09-29 19:37:07 +08:00
刘祥超
3bf94bc032 优化WAF关闭连接操作 2021-09-29 11:06:00 +08:00
刘祥超
a1aa2b9224 Block动作增加默认时间60秒 2021-09-29 09:19:45 +08:00
刘祥超
8d28ba3426 缓存条件增加最小内容尺寸配置 2021-09-26 15:01:46 +08:00
刘祥超
5a72c10d83 版本改为0.3.2 2021-09-26 10:09:56 +08:00
刘祥超
ec113c59ab 将版本修改为0.3.1 2021-09-23 21:00:06 +08:00
刘祥超
7f9d95ba37 修复当缓存内容为空时无法响应缓存的Bug 2021-09-22 21:13:31 +08:00
刘祥超
558314265a 修改部分命名 2021-09-22 19:40:11 +08:00
刘祥超
c793dd9d8c 特殊页面中的URL抓取的内容也支持请求变量 2021-09-21 10:13:30 +08:00
刘祥超
33e899a008 特殊页面支持请求变量 2021-09-21 09:29:17 +08:00
刘祥超
903f511139 反向代理源站实现使用域名分组 2021-09-20 11:54:37 +08:00
刘祥超
49dafafdc5 修复反向代理Hash调度算法无法生效的Bug 2021-09-20 09:59:24 +08:00
刘祥超
c6b01dc10a 修复反向代理sticky调度算法无法生效的Bug 2021-09-20 09:55:42 +08:00
刘祥超
1119b351aa 内存缓存增加最大数量限制 2021-09-19 16:11:46 +08:00
刘祥超
7b8ef8e85b 配置加载成功后才启动某些任务 2021-09-16 15:58:10 +08:00
71 changed files with 1335 additions and 561 deletions

View File

@@ -9,7 +9,7 @@
<h3>403 Forbidden</h3>
<p>Sorry, your access to the page has been denied. Please try again later.</p>
<footer>Powered by TeaEdge.</footer>
<footer>Powered by GoEdge.</footer>
</body>
</html>

View File

@@ -9,7 +9,7 @@
<h3>404 Not Found</h3>
<p>Sorry, the page you are looking for is not found. Please try again later.</p>
<footer>Powered by TeaEdge.</footer>
<footer>Powered by GoEdge.</footer>
</body>
</html>

View File

@@ -9,7 +9,7 @@
<h3>An error occurred.</h3>
<p>Sorry, the page you are looking for is currently unavailable. Please try again later.</p>
<footer>Powered by TeaEdge.</footer>
<footer>Powered by GoEdge.</footer>
</body>
</html>

View File

@@ -9,7 +9,7 @@
<h3>The website is shutdown.</h3>
<p>Sorry, the page you are looking for is currently unavailable. Please try again later.</p>
<footer>Powered by TeaEdge.</footer>
<footer>Powered by GoEdge.</footer>
</body>
</html>

View File

@@ -9,7 +9,7 @@
<h3>网站升级中</h3>
<p>为了给您提供更好的服务,我们正在升级网站,请稍后重新访问。</p>
<footer>Powered by TeaEdge.</footer>
<footer>Powered by GoEdge.</footer>
</body>
</html>

View File

@@ -9,7 +9,7 @@
<h3>网站暂时关闭</h3>
<p>网站已被暂时关闭,请耐心等待我们的重新开通通知。</p>
<footer>Powered by TeaEdge.</footer>
<footer>Powered by GoEdge.</footer>
</body>
</html>

3
go.mod
View File

@@ -7,7 +7,9 @@ replace github.com/TeaOSLab/EdgeCommon => ../EdgeCommon
require (
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/TeaOSLab/EdgeCommon v0.0.0-00010101000000-000000000000
github.com/andybalholm/brotli v1.0.3
github.com/cespare/xxhash v1.1.0
github.com/chai2010/webp v1.1.0
github.com/dchest/captcha v0.0.0-20200903113550-03f5f0333e1f
github.com/dop251/goja v0.0.0-20210804101310-32956a348b49
github.com/go-ole/go-ole v1.2.4 // indirect
@@ -21,6 +23,7 @@ require (
github.com/mssola/user_agent v0.5.2
github.com/shirou/gopsutil v3.21.5+incompatible
github.com/tklauser/go-sysconf v0.3.6 // indirect
golang.org/x/image v0.0.0-20190802002840-cff245a6509b
golang.org/x/net v0.0.0-20210614182718-04defd469f4e
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22
golang.org/x/text v0.3.6

5
go.sum
View File

@@ -7,12 +7,16 @@ github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/andybalholm/brotli v1.0.3 h1:fpcw+r1N1h0Poc1F/pHbW40cUm/lMEQslZtCkBQ0UnM=
github.com/andybalholm/brotli v1.0.3/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chai2010/webp v1.1.0 h1:4Ei0/BRroMF9FaXDG2e4OxwFcuW2vcXd+A6tyqTJUQQ=
github.com/chai2010/webp v1.1.0/go.mod h1:LP12PG5IFmLGHUU26tBiCBKnghxx3toZFwDjOYvd3Ow=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
@@ -126,6 +130,7 @@ golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20200513190911-00229845015e/go.mod h1:4M0jN8W1tt0AVLNr8HDosyJCDCDuyL9N9+3m7wDWgKw=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b h1:+qEpEAPhDZ1o0x3tHzZTQDArnOixOzGD9HUJfcg0mb4=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=

View File

@@ -188,7 +188,7 @@ func (this *FileList) Exist(hash string) (bool, error) {
var expiredAt int64
err = rows.Scan(&expiredAt)
if err != nil {
return true, nil
return false, nil
}
this.memoryCache.Write(hash, 1, expiredAt)
return true, nil

View File

@@ -63,6 +63,7 @@ func (this *FileReader) Init() error {
// body
bodySize := int(binary.BigEndian.Uint64(buf[SizeExpiresAt+SizeStatus+SizeURLLength+SizeHeaderLength : SizeExpiresAt+SizeStatus+SizeURLLength+SizeHeaderLength+SizeBodyLength]))
if bodySize == 0 {
isOk = true
return nil
}
this.bodySize = int64(bodySize)

View File

@@ -0,0 +1,76 @@
package caches
import (
"github.com/TeaOSLab/EdgeNode/internal/compressions"
)
type compressionWriter struct {
rawWriter Writer
writer compressions.Writer
key string
expiredAt int64
}
func NewCompressionWriter(gw Writer, cpWriter compressions.Writer, key string, expiredAt int64) Writer {
return &compressionWriter{
rawWriter: gw,
writer: cpWriter,
key: key,
expiredAt: expiredAt,
}
}
func (this *compressionWriter) WriteHeader(data []byte) (n int, err error) {
return this.writer.Write(data)
}
// WriteHeaderLength 写入Header长度数据
func (this *compressionWriter) WriteHeaderLength(headerLength int) error {
return nil
}
// WriteBodyLength 写入Body长度数据
func (this *compressionWriter) WriteBodyLength(bodyLength int64) error {
return nil
}
func (this *compressionWriter) Write(data []byte) (n int, err error) {
return this.writer.Write(data)
}
func (this *compressionWriter) Close() error {
err := this.writer.Close()
if err != nil {
return err
}
return this.rawWriter.Close()
}
func (this *compressionWriter) Discard() error {
err := this.writer.Close()
if err != nil {
return err
}
return this.rawWriter.Discard()
}
func (this *compressionWriter) Key() string {
return this.key
}
func (this *compressionWriter) ExpiredAt() int64 {
return this.expiredAt
}
func (this *compressionWriter) HeaderSize() int64 {
return this.rawWriter.HeaderSize()
}
func (this *compressionWriter) BodySize() int64 {
return this.rawWriter.BodySize()
}
// ItemType 内容类型
func (this *compressionWriter) ItemType() ItemType {
return this.rawWriter.ItemType()
}

View File

@@ -1,76 +0,0 @@
package caches
import (
"compress/gzip"
)
type gzipWriter struct {
rawWriter Writer
writer *gzip.Writer
key string
expiredAt int64
}
func NewGzipWriter(gw Writer, key string, expiredAt int64) Writer {
return &gzipWriter{
rawWriter: gw,
writer: gzip.NewWriter(gw),
key: key,
expiredAt: expiredAt,
}
}
func (this *gzipWriter) WriteHeader(data []byte) (n int, err error) {
return this.writer.Write(data)
}
// 写入Header长度数据
func (this *gzipWriter) WriteHeaderLength(headerLength int) error {
return nil
}
// 写入Body长度数据
func (this *gzipWriter) WriteBodyLength(bodyLength int64) error {
return nil
}
func (this *gzipWriter) Write(data []byte) (n int, err error) {
return this.writer.Write(data)
}
func (this *gzipWriter) Close() error {
err := this.writer.Close()
if err != nil {
return err
}
return this.rawWriter.Close()
}
func (this *gzipWriter) Discard() error {
err := this.writer.Close()
if err != nil {
return err
}
return this.rawWriter.Discard()
}
func (this *gzipWriter) Key() string {
return this.key
}
func (this *gzipWriter) ExpiredAt() int64 {
return this.expiredAt
}
func (this *gzipWriter) HeaderSize() int64 {
return this.rawWriter.HeaderSize()
}
func (this *gzipWriter) BodySize() int64 {
return this.rawWriter.BodySize()
}
// 内容类型
func (this *gzipWriter) ItemType() ItemType {
return this.rawWriter.ItemType()
}

View File

@@ -0,0 +1,21 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions
import (
"errors"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"io"
)
func NewWriter(writer io.Writer, compressType serverconfigs.HTTPCompressionType, level int) (Writer, error) {
switch compressType {
case serverconfigs.HTTPCompressionTypeGzip:
return NewGzipWriter(writer, level)
case serverconfigs.HTTPCompressionTypeDeflate:
return NewDeflateWriter(writer, level)
case serverconfigs.HTTPCompressionTypeBrotli:
return NewBrotliWriter(writer, level)
}
return nil, errors.New("invalid compression type '" + compressType + "'")
}

View File

@@ -0,0 +1,10 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions
type Writer interface {
Write(p []byte) (int, error)
Flush() error
Close() error
Level() int
}

View File

@@ -0,0 +1,41 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions
import (
"github.com/andybalholm/brotli"
"io"
)
type BrotliWriter struct {
writer *brotli.Writer
level int
}
func NewBrotliWriter(writer io.Writer, level int) (Writer, error) {
if level <= 0 {
level = brotli.BestSpeed
} else if level > brotli.BestCompression {
level = brotli.BestCompression
}
return &BrotliWriter{
writer: brotli.NewWriterLevel(writer, level),
level: level,
}, nil
}
func (this *BrotliWriter) Write(p []byte) (int, error) {
return this.writer.Write(p)
}
func (this *BrotliWriter) Flush() error {
return this.writer.Flush()
}
func (this *BrotliWriter) Close() error {
return this.writer.Close()
}
func (this *BrotliWriter) Level() int {
return this.level
}

View File

@@ -0,0 +1,47 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions
import (
"compress/flate"
"io"
)
type DeflateWriter struct {
writer *flate.Writer
level int
}
func NewDeflateWriter(writer io.Writer, level int) (Writer, error) {
if level <= 0 {
level = flate.BestSpeed
} else if level > flate.BestCompression {
level = flate.BestCompression
}
flateWriter, err := flate.NewWriter(writer, level)
if err != nil {
return nil, err
}
return &DeflateWriter{
writer: flateWriter,
level: level,
}, nil
}
func (this *DeflateWriter) Write(p []byte) (int, error) {
return this.writer.Write(p)
}
func (this *DeflateWriter) Flush() error {
return this.writer.Flush()
}
func (this *DeflateWriter) Close() error {
return this.writer.Close()
}
func (this *DeflateWriter) Level() int {
return this.level
}

View File

@@ -0,0 +1,47 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package compressions
import (
"compress/gzip"
"io"
)
type GzipWriter struct {
writer *gzip.Writer
level int
}
func NewGzipWriter(writer io.Writer, level int) (Writer, error) {
if level <= 0 {
level = gzip.BestSpeed
} else if level > gzip.BestCompression {
level = gzip.BestCompression
}
gzipWriter, err := gzip.NewWriterLevel(writer, level)
if err != nil {
return nil, err
}
return &GzipWriter{
writer: gzipWriter,
level: level,
}, nil
}
func (this *GzipWriter) Write(p []byte) (int, error) {
return this.writer.Write(p)
}
func (this *GzipWriter) Flush() error {
return this.writer.Flush()
}
func (this *GzipWriter) Close() error {
return this.writer.Close()
}
func (this *GzipWriter) Level() int {
return this.level
}

View File

@@ -1,7 +1,7 @@
package teaconst
const (
Version = "0.3.0"
Version = "0.3.2"
ProductName = "Edge Node"
ProcessName = "edge-node"

View File

@@ -10,7 +10,7 @@ const (
IPItemTypeAll IPItemType = "all" // 所有IP
)
// IP条目
// IPItem IP条目
type IPItem struct {
Type string `json:"type"`
Id int64 `json:"id"`
@@ -20,7 +20,7 @@ type IPItem struct {
EventLevel string `json:"eventLevel"`
}
// 检查是否包含某个IP
// Contains 检查是否包含某个IP
func (this *IPItem) Contains(ip uint64) bool {
switch this.Type {
case IPItemTypeIPv4:

View File

@@ -3,6 +3,7 @@ package iplibrary
import (
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/assert"
"runtime"
"testing"
"time"
)
@@ -72,3 +73,36 @@ func TestIPItem_Contains(t *testing.T) {
a.IsTrue(item.Contains(utils.IP2Long("192.168.1.1")))
}
}
func TestIPItem_Memory(t *testing.T) {
var list = NewIPList()
for i := 0; i < 2_000_000; i ++ {
list.Add(&IPItem{
Type: "ip",
Id: int64(i),
IPFrom: utils.IP2Long("192.168.1.1"),
IPTo: 0,
ExpiredAt: time.Now().Unix(),
EventLevel: "",
})
}
t.Log("waiting")
time.Sleep(10 * time.Second)
}
func BenchmarkIPItem_Contains(b *testing.B) {
runtime.GOMAXPROCS(1)
item := &IPItem{
IPFrom: utils.IP2Long("192.168.1.1"),
IPTo: utils.IP2Long("192.168.1.101"),
ExpiredAt: 0,
}
ip := utils.IP2Long("192.168.1.1")
for i := 0; i < b.N; i++ {
for j := 0; j < 10_000; j++ {
item.Contains(ip)
}
}
}

View File

@@ -3,24 +3,26 @@ package iplibrary
import (
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/TeaOSLab/EdgeNode/internal/utils/expires"
"sort"
"sync"
)
// IPList IP名单
// TODO IP名单可以分片关闭这样让每一片的数据量减少查询更快
type IPList struct {
itemsMap map[int64]*IPItem // id => item
ipMap map[uint64][]int64 // ip => itemIds
expireList *expires.List
itemsMap map[int64]*IPItem // id => item
sortedItems []*IPItem
allItemsMap map[int64]*IPItem // id => item
isAll bool
expireList *expires.List
locker sync.RWMutex
}
func NewIPList() *IPList {
list := &IPList{
itemsMap: map[int64]*IPItem{},
ipMap: map[uint64][]int64{},
itemsMap: map[int64]*IPItem{},
allItemsMap: map[int64]*IPItem{},
}
expireList := expires.NewList()
@@ -34,14 +36,94 @@ func NewIPList() *IPList {
}
func (this *IPList) Add(item *IPItem) {
this.addItem(item, true)
}
// AddDelay 延迟添加需要手工调用Sort()函数
func (this *IPList) AddDelay(item *IPItem) {
this.addItem(item, false)
}
func (this *IPList) Sort() {
this.locker.Lock()
this.sortItems()
this.locker.Unlock()
}
func (this *IPList) Delete(itemId int64) {
this.locker.Lock()
this.deleteItem(itemId)
this.locker.Unlock()
}
// Contains 判断是否包含某个IP
func (this *IPList) Contains(ip uint64) bool {
this.locker.RLock()
if len(this.allItemsMap) > 0 {
this.locker.RUnlock()
return true
}
var item = this.lookupIP(ip)
this.locker.RUnlock()
return item != nil
}
// ContainsIPStrings 是否包含一组IP中的任意一个并返回匹配的第一个Item
func (this *IPList) ContainsIPStrings(ipStrings []string) (item *IPItem, found bool) {
if len(ipStrings) == 0 {
return
}
this.locker.RLock()
if len(this.allItemsMap) > 0 {
for _, allItem := range this.allItemsMap {
item = allItem
break
}
if item != nil {
this.locker.RUnlock()
found = true
return
}
}
for _, ipString := range ipStrings {
if len(ipString) == 0 {
continue
}
item = this.lookupIP(utils.IP2Long(ipString))
if item != nil {
this.locker.RUnlock()
found = true
return
}
}
this.locker.RUnlock()
return
}
func (this *IPList) addItem(item *IPItem, sortable bool) {
if item == nil {
return
}
if item.ExpiredAt > 0 && item.ExpiredAt < utils.UnixTime() {
return
}
if item.IPFrom == 0 && item.IPTo == 0 {
if item.Type != "all" {
if item.Type != IPItemTypeAll {
return
}
} else if item.IPTo > 0 {
if item.IPFrom > item.IPTo {
item.IPFrom, item.IPTo = item.IPTo, item.IPFrom
} else if item.IPFrom == 0 {
item.IPFrom = item.IPTo
item.IPTo = 0
}
}
this.locker.Lock()
@@ -56,157 +138,86 @@ func (this *IPList) Add(item *IPItem) {
// 展开
if item.IPFrom > 0 {
if item.IPTo == 0 {
this.addIP(item.IPFrom, item.Id)
} else {
if item.IPFrom > item.IPTo {
item.IPTo, item.IPFrom = item.IPFrom, item.IPTo
}
for i := item.IPFrom; i <= item.IPTo; i++ {
// 最多不能超过65535防止整个系统内存爆掉
if i >= item.IPFrom+65535 {
break
}
this.addIP(i, item.Id)
}
}
} else if item.IPTo > 0 {
this.addIP(item.IPTo, item.Id)
this.sortedItems = append(this.sortedItems, item)
} else {
this.addIP(0, item.Id)
// 更新isAll
this.isAll = true
this.allItemsMap[item.Id] = item
}
if item.ExpiredAt > 0 {
this.expireList.Add(item.Id, item.ExpiredAt)
}
if sortable {
this.sortItems()
}
this.locker.Unlock()
}
func (this *IPList) Delete(itemId int64) {
this.locker.Lock()
defer this.locker.Unlock()
this.deleteItem(itemId)
// 更新isAll
this.isAll = len(this.ipMap[0]) > 0
// 对列表进行排序
func (this *IPList) sortItems() {
sort.Slice(this.sortedItems, func(i, j int) bool {
var item1 = this.sortedItems[i]
var item2 = this.sortedItems[j]
if item1.IPFrom == item2.IPFrom {
return item1.IPTo < item2.IPTo
}
return item1.IPFrom < item2.IPFrom
})
}
// Contains 判断是否包含某个IP
func (this *IPList) Contains(ip uint64) bool {
this.locker.RLock()
if this.isAll {
this.locker.RUnlock()
return true
}
_, ok := this.ipMap[ip]
this.locker.RUnlock()
return ok
}
// ContainsIPStrings 是否包含一组IP
func (this *IPList) ContainsIPStrings(ipStrings []string) (found bool, item *IPItem) {
if len(ipStrings) == 0 {
return
}
this.locker.RLock()
if this.isAll {
itemIds := this.ipMap[0]
if len(itemIds) > 0 {
itemId := itemIds[0]
item = this.itemsMap[itemId]
}
this.locker.RUnlock()
found = true
return
}
for _, ipString := range ipStrings {
if len(ipString) == 0 {
continue
}
itemIds, ok := this.ipMap[utils.IP2Long(ipString)]
if ok {
if len(itemIds) > 0 {
itemId := itemIds[0]
item = this.itemsMap[itemId]
// 不加锁的情况下查找Item
func (this *IPList) lookupIP(ip uint64) *IPItem {
var count = len(this.sortedItems)
var resultIndex = -1
sort.Search(count, func(i int) bool {
var item = this.sortedItems[i]
if item.IPFrom < ip {
if item.IPTo >= ip {
resultIndex = i
}
this.locker.RUnlock()
found = true
return
return false
} else if item.IPFrom == ip {
resultIndex = i
return false
}
return true
})
if resultIndex < 0 || resultIndex >= count {
return nil
}
this.locker.RUnlock()
return
return this.sortedItems[resultIndex]
}
// 在不加锁的情况下删除某个Item
// 将会被别的方法引用,切记不能加锁
func (this *IPList) deleteItem(itemId int64) {
item, ok := this.itemsMap[itemId]
_, ok := this.itemsMap[itemId]
if !ok {
return
}
delete(this.itemsMap, itemId)
// 展开
if item.IPFrom > 0 {
if item.IPTo == 0 {
this.deleteIP(item.IPFrom, item.Id)
} else {
if item.IPFrom > item.IPTo {
item.IPTo, item.IPFrom = item.IPFrom, item.IPTo
}
for i := item.IPFrom; i <= item.IPTo; i++ {
// 最多不能超过65535防止整个系统内存爆掉
if i >= item.IPFrom+65535 {
break
}
this.deleteIP(i, item.Id)
}
}
} else if item.IPTo > 0 {
this.deleteIP(item.IPTo, item.Id)
} else {
this.deleteIP(0, item.Id)
}
}
// 添加单个IP
func (this *IPList) addIP(ip uint64, itemId int64) {
itemIds, ok := this.ipMap[ip]
// 是否为All Item
_, ok = this.allItemsMap[itemId]
if ok {
itemIds = append(itemIds, itemId)
} else {
itemIds = []int64{itemId}
}
this.ipMap[ip] = itemIds
}
// 删除单个IP
func (this *IPList) deleteIP(ip uint64, itemId int64) {
itemIds, ok := this.ipMap[ip]
if !ok {
delete(this.allItemsMap, itemId)
return
}
newItemIds := []int64{}
for _, oldItemId := range itemIds {
if oldItemId == itemId {
continue
// 删除排序中的Item
var index = -1
for itemIndex, item := range this.sortedItems {
if item.Id == itemId {
index = itemIndex
break
}
newItemIds = append(newItemIds, oldItemId)
}
if len(newItemIds) > 0 {
this.ipMap[ip] = newItemIds
} else {
delete(this.ipMap, ip)
if index >= 0 {
copy(this.sortedItems[index:], this.sortedItems[index+1:])
this.sortedItems = this.sortedItems[:len(this.sortedItems)-1]
}
}

View File

@@ -4,6 +4,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/assert"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/rands"
"runtime"
"strconv"
"testing"
@@ -16,7 +17,7 @@ func TestIPList_Add_Empty(t *testing.T) {
Id: 1,
})
logs.PrintAsJSON(ipList.itemsMap, t)
logs.PrintAsJSON(ipList.ipMap, t)
logs.PrintAsJSON(ipList.allItemsMap, t)
}
func TestIPList_Add_One(t *testing.T) {
@@ -31,15 +32,30 @@ func TestIPList_Add_One(t *testing.T) {
})
ipList.Add(&IPItem{
Id: 3,
IPFrom: utils.IP2Long("2001:db8:0:1::101"),
IPFrom: utils.IP2Long("192.168.0.2"),
})
ipList.Add(&IPItem{
Id: 4,
IPFrom: utils.IP2Long("192.168.0.2"),
IPTo: utils.IP2Long("192.168.0.1"),
})
ipList.Add(&IPItem{
Id: 5,
IPFrom: utils.IP2Long("2001:db8:0:1::101"),
})
ipList.Add(&IPItem{
Id: 6,
IPFrom: 0,
Type: "all",
})
t.Log("===items===")
logs.PrintAsJSON(ipList.itemsMap, t)
logs.PrintAsJSON(ipList.ipMap, t) // ip => items
t.Log("===sorted items===")
logs.PrintAsJSON(ipList.sortedItems, t)
t.Log("===all items===")
logs.PrintAsJSON(ipList.allItemsMap, t) // ip => items
}
func TestIPList_Update(t *testing.T) {
@@ -50,14 +66,31 @@ func TestIPList_Update(t *testing.T) {
})
/**ipList.Add(&IPItem{
Id: 2,
IPFrom: IP2Long("192.168.1.1"),
IPFrom: utils.IP2Long("192.168.1.1"),
})**/
ipList.Add(&IPItem{
Id: 1,
IPTo: utils.IP2Long("192.168.1.2"),
})
logs.PrintAsJSON(ipList.itemsMap, t)
logs.PrintAsJSON(ipList.ipMap, t)
logs.PrintAsJSON(ipList.sortedItems, t)
}
func TestIPList_Update_AllItems(t *testing.T) {
ipList := NewIPList()
ipList.Add(&IPItem{
Id: 1,
Type: IPItemTypeAll,
IPFrom: 0,
})
ipList.Add(&IPItem{
Id: 1,
IPTo: 0,
})
t.Log("===items map===")
logs.PrintAsJSON(ipList.itemsMap, t)
t.Log("===all items map===")
logs.PrintAsJSON(ipList.allItemsMap, t)
}
func TestIPList_Add_Range(t *testing.T) {
@@ -71,9 +104,9 @@ func TestIPList_Add_Range(t *testing.T) {
Id: 2,
IPTo: utils.IP2Long("192.168.1.2"),
})
t.Log(len(ipList.ipMap), "ips")
t.Log(len(ipList.itemsMap), "ips")
logs.PrintAsJSON(ipList.itemsMap, t)
logs.PrintAsJSON(ipList.ipMap, t)
logs.PrintAsJSON(ipList.allItemsMap, t)
}
func TestIPList_Add_Overflow(t *testing.T) {
@@ -85,8 +118,8 @@ func TestIPList_Add_Overflow(t *testing.T) {
IPFrom: utils.IP2Long("192.168.1.1"),
IPTo: utils.IP2Long("192.169.255.1"),
})
t.Log(len(ipList.ipMap), "ips")
a.IsTrue(len(ipList.ipMap) <= 65535)
t.Log(len(ipList.itemsMap), "ips")
a.IsTrue(len(ipList.itemsMap) <= 65535)
}
func TestNewIPList_Memory(t *testing.T) {
@@ -104,20 +137,50 @@ func TestNewIPList_Memory(t *testing.T) {
}
func TestIPList_Contains(t *testing.T) {
var a = assert.NewAssertion(t)
list := NewIPList()
for i := 0; i < 255; i++ {
list.Add(&IPItem{
list.AddDelay(&IPItem{
Id: int64(i),
IPFrom: utils.IP2Long(strconv.Itoa(i) + ".168.0.1"),
IPTo: utils.IP2Long(strconv.Itoa(i) + ".168.255.1"),
ExpiredAt: 0,
})
}
t.Log(len(list.ipMap), "ip")
for i := 0; i < 255; i++ {
list.AddDelay(&IPItem{
Id: int64(1000 + i),
IPFrom: utils.IP2Long("192.167.2." + strconv.Itoa(i)),
})
}
list.Sort()
t.Log(len(list.itemsMap), "ip")
before := time.Now()
t.Log(list.Contains(utils.IP2Long("192.168.1.100")))
t.Log(list.Contains(utils.IP2Long("192.168.2.100")))
a.IsTrue(list.Contains(utils.IP2Long("192.168.1.100")))
a.IsTrue(list.Contains(utils.IP2Long("192.168.2.100")))
a.IsFalse(list.Contains(utils.IP2Long("192.169.3.100")))
a.IsFalse(list.Contains(utils.IP2Long("192.167.3.100")))
a.IsTrue(list.Contains(utils.IP2Long("192.167.2.100")))
t.Log(time.Since(before).Seconds()*1000, "ms")
}
func TestIPList_Contains_Many(t *testing.T) {
list := NewIPList()
for i := 0; i < 1_000_000; i++ {
list.AddDelay(&IPItem{
Id: int64(i),
IPFrom: utils.IP2Long(strconv.Itoa(rands.Int(0, 255)) + "." + strconv.Itoa(rands.Int(0, 255)) + "." + strconv.Itoa(rands.Int(0, 255)) + "." + strconv.Itoa(rands.Int(0, 255))),
IPTo: utils.IP2Long(strconv.Itoa(rands.Int(0, 255)) + "." + strconv.Itoa(rands.Int(0, 255)) + "." + strconv.Itoa(rands.Int(0, 255)) + "." + strconv.Itoa(rands.Int(0, 255))),
ExpiredAt: 0,
})
}
list.Sort()
t.Log(len(list.itemsMap), "ip")
before := time.Now()
_ = list.Contains(utils.IP2Long("192.168.1.100"))
t.Log(time.Since(before).Seconds()*1000, "ms")
}
@@ -146,6 +209,32 @@ func TestIPList_ContainsAll(t *testing.T) {
}
func TestIPList_ContainsIPStrings(t *testing.T) {
var a = assert.NewAssertion(t)
list := NewIPList()
for i := 0; i < 255; i++ {
list.Add(&IPItem{
Id: int64(i),
IPFrom: utils.IP2Long(strconv.Itoa(i) + ".168.0.1"),
IPTo: utils.IP2Long(strconv.Itoa(i) + ".168.255.1"),
ExpiredAt: 0,
})
}
t.Log(len(list.itemsMap), "ip")
{
item, ok := list.ContainsIPStrings([]string{"192.168.1.100"})
t.Log("item:", item)
a.IsTrue(ok)
}
{
item, ok := list.ContainsIPStrings([]string{"192.167.1.100"})
t.Log("item:", item)
a.IsFalse(ok)
}
}
func TestIPList_Delete(t *testing.T) {
list := NewIPList()
list.Add(&IPItem{
@@ -160,13 +249,13 @@ func TestIPList_Delete(t *testing.T) {
})
t.Log("===BEFORE===")
logs.PrintAsJSON(list.itemsMap, t)
logs.PrintAsJSON(list.ipMap, t)
logs.PrintAsJSON(list.allItemsMap, t)
list.Delete(1)
t.Log("===AFTER===")
logs.PrintAsJSON(list.itemsMap, t)
logs.PrintAsJSON(list.ipMap, t)
logs.PrintAsJSON(list.allItemsMap, t)
}
func TestGC(t *testing.T) {
@@ -184,27 +273,27 @@ func TestGC(t *testing.T) {
ExpiredAt: 0,
})
logs.PrintAsJSON(list.itemsMap, t)
logs.PrintAsJSON(list.ipMap, t)
logs.PrintAsJSON(list.allItemsMap, t)
time.Sleep(2 * time.Second)
t.Log("===AFTER GC===")
logs.PrintAsJSON(list.itemsMap, t)
logs.PrintAsJSON(list.ipMap, t)
logs.PrintAsJSON(list.sortedItems, t)
}
func BenchmarkIPList_Contains(b *testing.B) {
runtime.GOMAXPROCS(1)
list := NewIPList()
for i := 192; i < 194; i++ {
for i := 1; i < 194; i++ {
list.Add(&IPItem{
Id: int64(1),
IPFrom: utils.IP2Long(strconv.Itoa(i) + ".1.0.1"),
IPTo: utils.IP2Long(strconv.Itoa(i) + ".2.0.1"),
Id: int64(i),
IPFrom: utils.IP2Long(strconv.Itoa(i%255) + "." + strconv.Itoa(i%255) + ".0.1"),
IPTo: utils.IP2Long(strconv.Itoa(i%255) + "." + strconv.Itoa(i%255) + ".0.1"),
ExpiredAt: time.Now().Unix() + 60,
})
}
b.Log(len(list.ipMap), "ip")
b.Log(len(list.itemsMap), "ip")
for i := 0; i < b.N; i++ {
_ = list.Contains(utils.IP2Long("192.168.1.100"))
}

View File

@@ -5,9 +5,9 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/errors"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/files"
"github.com/iwind/TeaGo/logs"
"github.com/iwind/TeaGo/types"
"regexp"
"strings"
@@ -21,7 +21,7 @@ func init() {
// 初始化
library, err := SharedManager.Load()
if err != nil {
logs.Println("[IP_LIBRARY]" + err.Error())
remotelogs.Error("IP_LIBRARY", err.Error())
return
}
SharedLibrary = library

View File

@@ -20,12 +20,12 @@ import (
var SharedCountryManager = NewCountryManager()
func init() {
events.On(events.EventStart, func() {
events.On(events.EventLoaded, func() {
go SharedCountryManager.Start()
})
}
// 国家信息管理
// CountryManager 国家/地区信息管理
type CountryManager struct {
cacheFile string

View File

@@ -15,12 +15,12 @@ var SharedIPListManager = NewIPListManager()
var IPListUpdateNotify = make(chan bool, 1)
func init() {
events.On(events.EventStart, func() {
events.On(events.EventLoaded, func() {
go SharedIPListManager.Start()
})
}
// IP名单管理
// IPListManager IP名单管理
type IPListManager struct {
// 缓存文件
// 每行一个数据id|from|to|expiredAt
@@ -112,12 +112,16 @@ func (this *IPListManager) fetch() (hasNext bool, err error) {
return false, nil
}
this.locker.Lock()
var changedLists = map[*IPList]bool{}
for _, item := range items {
list, ok := this.listMap[item.ListId]
if !ok {
list = NewIPList()
this.listMap[item.ListId] = list
}
changedLists[list] = true
if item.IsDeleted {
list.Delete(item.Id)
@@ -127,7 +131,7 @@ func (this *IPListManager) fetch() (hasNext bool, err error) {
continue
}
list.Add(&IPItem{
list.AddDelay(&IPItem{
Id: item.Id,
Type: item.Type,
IPFrom: utils.IP2Long(item.IpFrom),
@@ -140,6 +144,11 @@ func (this *IPListManager) fetch() (hasNext bool, err error) {
SharedActionManager.DeleteItem(item.ListType, item)
SharedActionManager.AddItem(item.ListType, item)
}
for changedList := range changedLists {
changedList.Sort()
}
this.locker.Unlock()
this.version = items[len(items)-1].Version

View File

@@ -24,12 +24,12 @@ const (
var SharedProvinceManager = NewProvinceManager()
func init() {
events.On(events.EventStart, func() {
events.On(events.EventLoaded, func() {
go SharedProvinceManager.Start()
})
}
// 国家信息管理
// ProvinceManager 中国省份信息管理
type ProvinceManager struct {
cacheFile string

View File

@@ -7,9 +7,9 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/errors"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/logs"
"os"
"time"
)
@@ -21,16 +21,16 @@ func init() {
})
}
// IP库更新程序
// Updater IP库更新程序
type Updater struct {
}
// 获取新对象
// NewUpdater 获取新对象
func NewUpdater() *Updater {
return &Updater{}
}
// 开始更新
// Start 开始更新
func (this *Updater) Start() {
// 这里不需要太频繁检查更新因为通常不需要更新IP库
ticker := time.NewTicker(1 * time.Hour)
@@ -38,7 +38,7 @@ func (this *Updater) Start() {
for range ticker.C {
err := this.loop()
if err != nil {
logs.Println("[IP_LIBRARY]" + err.Error())
remotelogs.Error("IP_LIBRARY", err.Error())
}
}
}()

View File

@@ -212,6 +212,12 @@ func (this *Task) Add(obj MetricInterface) {
var keys = []string{}
for _, key := range this.item.Keys {
k := obj.MetricKey(key)
// 忽略499状态
if key == "${status}" && k == "499" {
return
}
keys = append(keys, k)
}

View File

@@ -15,7 +15,7 @@ import (
var SharedValueQueue = NewValueQueue()
func init() {
events.On(events.EventStart, func() {
events.On(events.EventLoaded, func() {
go SharedValueQueue.Start()
})
}

View File

@@ -145,6 +145,16 @@ func (this *APIStream) handleConnectedAPINode(message *pb.NodeStreamMessage) err
return errors.Wrap(err)
}
remotelogs.Println("API_STREAM", "connected to api node '"+strconv.FormatInt(msg.APINodeId, 10)+"'")
// 重新读取配置
if nodeConfigUpdatedAt == 0 {
select {
case nodeConfigChangedNotify <- true:
default:
}
}
return nil
}
@@ -332,6 +342,15 @@ func (this *APIStream) handlePurgeCache(message *pb.NodeStreamMessage) error {
}()
}
// WEBP缓存
if msg.Type == "file" {
var keys = msg.Keys
for _, key := range keys {
keys = append(keys, key+webpSuffix)
}
msg.Keys = keys
}
err = storage.Purge(msg.Keys, msg.Type)
if err != nil {
this.replyFail(message.RequestId, "purge keys failed: "+err.Error())

View File

@@ -150,9 +150,9 @@ func (this *HTTPRequest) Do() {
}
}
// Gzip
if this.web.GzipRef != nil && this.web.GzipRef.IsOn && this.web.Gzip != nil && this.web.Gzip.IsOn && this.web.Gzip.Level > 0 {
this.writer.Gzip(this.web.Gzip)
// Compression
if this.web.Compression != nil && this.web.Compression.IsOn && this.web.Compression.Level > 0 {
this.writer.SetCompression(this.web.Compression)
}
// 开始调用
@@ -322,6 +322,11 @@ func (this *HTTPRequest) configureWeb(web *serverconfigs.HTTPWebConfig, isTop bo
this.web.Root = web.Root
}
// remote addr
if web.RemoteAddr != nil && (web.RemoteAddr.IsPrior || isTop) && web.RemoteAddr.IsOn {
this.web.RemoteAddr = web.RemoteAddr
}
// charset
if web.Charset != nil && (web.Charset.IsPrior || isTop) {
this.web.Charset = web.Charset
@@ -333,10 +338,14 @@ func (this *HTTPRequest) configureWeb(web *serverconfigs.HTTPWebConfig, isTop bo
this.web.Websocket = web.Websocket
}
// gzip
if web.GzipRef != nil && (web.GzipRef.IsPrior || isTop) {
this.web.GzipRef = web.GzipRef
this.web.Gzip = web.Gzip
// compression
if web.Compression != nil && (web.Compression.IsPrior || isTop) {
this.web.Compression = web.Compression
}
// webp
if web.WebP != nil && (web.WebP.IsPrior || isTop) {
this.web.WebP = web.WebP
}
// cache
@@ -501,7 +510,9 @@ func (this *HTTPRequest) Format(source string) string {
case "edgeVersion":
return teaconst.Version
case "remoteAddr":
return this.requestRemoteAddr()
return this.requestRemoteAddr(true)
case "remoteAddrValue":
return this.requestRemoteAddr(false)
case "rawRemoteAddr":
addr := this.RawReq.RemoteAddr
host, _, err := net.SplitHostPort(addr)
@@ -757,22 +768,36 @@ func (this *HTTPRequest) addVarMapping(varMapping map[string]string) {
}
// 获取请求的客户端地址
func (this *HTTPRequest) requestRemoteAddr() string {
func (this *HTTPRequest) requestRemoteAddr(supportVar bool) string {
if supportVar &&
this.web.RemoteAddr != nil &&
this.web.RemoteAddr.IsOn &&
!this.web.RemoteAddr.IsEmpty() {
var remoteAddr = this.Format(this.web.RemoteAddr.Value)
if net.ParseIP(remoteAddr) != nil {
return remoteAddr
}
}
// X-Forwarded-For
forwardedFor := this.RawReq.Header.Get("X-Forwarded-For")
if len(forwardedFor) > 0 {
commaIndex := strings.Index(forwardedFor, ",")
if commaIndex > 0 {
return forwardedFor[:commaIndex]
forwardedFor = forwardedFor[:commaIndex]
}
if net.ParseIP(forwardedFor) != nil {
return forwardedFor
}
return forwardedFor
}
// Real-IP
{
realIP, ok := this.RawReq.Header["X-Real-IP"]
if ok && len(realIP) > 0 {
return realIP[0]
if net.ParseIP(realIP[0]) != nil {
return realIP[0]
}
}
}
@@ -780,7 +805,9 @@ func (this *HTTPRequest) requestRemoteAddr() string {
{
realIP, ok := this.RawReq.Header["X-Real-Ip"]
if ok && len(realIP) > 0 {
return realIP[0]
if net.ParseIP(realIP[0]) != nil {
return realIP[0]
}
}
}

View File

@@ -6,6 +6,7 @@ import (
"github.com/TeaOSLab/EdgeNode/internal/caches"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"net/http"
"path/filepath"
"strconv"
"strings"
"time"
@@ -23,7 +24,7 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
}
// 判断是否在预热
if strings.HasPrefix(this.RawReq.RemoteAddr, "127.") && this.RawReq.Header.Get("X-Cache-Action") == "preheat" {
if (strings.HasPrefix(this.RawReq.RemoteAddr, "127.") || strings.HasPrefix(this.RawReq.RemoteAddr, "[::1]")) && this.RawReq.Header.Get("X-Cache-Action") == "preheat" {
return
}
@@ -98,6 +99,7 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
this.cacheRef = nil
return
}
this.cacheKey = key
// 读取缓存
@@ -112,18 +114,32 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
bytePool32k.Put(buf)
}()
reader, err := storage.OpenReader(key)
if err != nil {
if err == caches.ErrNotFound {
// cache相关变量
this.varMapping["cache.status"] = "MISS"
var reader caches.Reader
var err error
// 是否优先检查WebP
if this.web.WebP != nil &&
this.web.WebP.IsOn &&
this.web.WebP.MatchRequest(filepath.Ext(this.requestPath()), this.Format) &&
this.web.WebP.MatchAccept(this.requestHeader("Accept")) {
reader, _ = storage.OpenReader(key + webpSuffix)
}
// 检查正常的文件
if reader == nil {
reader, err = storage.OpenReader(key)
if err != nil {
if err == caches.ErrNotFound {
// cache相关变量
this.varMapping["cache.status"] = "MISS"
return
}
if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
}
return
}
if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_CACHE", "read from cache failed: "+err.Error())
}
return
}
defer func() {
_ = reader.Close()
@@ -165,25 +181,22 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
}
// ETag
// 这里强制设置ETag如果先前源站设置了ETag将会被覆盖避免因为源站的ETag导致源站返回304 Not Modified
var respHeader = this.writer.Header()
var eTag = respHeader.Get("ETag")
var eTag = ""
var lastModifiedAt = reader.LastModified()
if len(eTag) == 0 {
if lastModifiedAt > 0 {
eTag = "\"" + strconv.FormatInt(lastModifiedAt, 10) + "\""
respHeader["ETag"] = []string{eTag}
}
if lastModifiedAt > 0 {
eTag = "\"" + strconv.FormatInt(lastModifiedAt, 10) + "\""
respHeader.Del("Etag")
respHeader["ETag"] = []string{eTag}
}
// 支持 Last-Modified
var modifiedTime = respHeader.Get("Last-Modified")
if len(modifiedTime) == 0 {
if lastModifiedAt > 0 {
modifiedTime = time.Unix(lastModifiedAt, 0).Format("Mon, 02 Jan 2006 15:04:05 GMT")
if len(respHeader.Get("Last-Modified")) == 0 {
respHeader.Set("Last-Modified", modifiedTime)
}
}
// 这里强制设置Last-Modified如果先前源站设置了Last-Modified将会被覆盖避免因为源站的Last-Modified导致源站返回304 Not Modified
var modifiedTime = ""
if lastModifiedAt > 0 {
modifiedTime = time.Unix(lastModifiedAt, 0).Format("Mon, 02 Jan 2006 15:04:05 GMT")
respHeader.Set("Last-Modified", modifiedTime)
}
// 支持 If-None-Match
@@ -193,6 +206,7 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
this.writer.WriteHeader(http.StatusNotModified)
this.isCached = true
this.cacheRef = nil
this.writer.SetOk()
return true
}
@@ -203,6 +217,7 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
this.writer.WriteHeader(http.StatusNotModified)
this.isCached = true
this.cacheRef = nil
this.writer.SetOk()
return true
}
@@ -348,6 +363,7 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
return true
}
} else { // 没有Range
this.writer.PrepareCompression(reader.BodySize())
this.writer.WriteHeader(reader.Status())
err = reader.ReadBody(buf, func(n int) (goNext bool, err error) {
@@ -368,5 +384,8 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
this.isCached = true
this.cacheRef = nil
this.writer.SetOk()
return true
}

View File

@@ -41,7 +41,7 @@ func (this *HTTPRequest) doFastcgi() (shouldStop bool) {
}
if !env.Has("REMOTE_ADDR") {
env["REMOTE_ADDR"] = this.requestRemoteAddr()
env["REMOTE_ADDR"] = this.requestRemoteAddr(true)
}
if !env.Has("QUERY_STRING") {
u, err := url.ParseRequestURI(this.uri)

View File

@@ -88,7 +88,7 @@ func (this *HTTPRequest) log() {
RequestId: strconv.FormatInt(this.requestFromTime.UnixNano(), 10) + strconv.FormatInt(atomic.AddInt64(&requestId, 1), 10) + sharedNodeConfig.PaddedId(),
NodeId: sharedNodeConfig.Id,
ServerId: this.Server.Id,
RemoteAddr: this.requestRemoteAddr(),
RemoteAddr: this.requestRemoteAddr(true),
RawRemoteAddr: addr,
RemotePort: int32(this.requestRemotePort()),
RemoteUser: this.requestRemoteUser(),

View File

@@ -1,10 +1,11 @@
package nodes
import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/logs"
"io"
"net/http"
"os"
"regexp"
@@ -20,24 +21,54 @@ func (this *HTTPRequest) doPage(status int) (shouldStop bool) {
for _, page := range this.web.Pages {
if page.Match(status) {
if urlPrefixRegexp.MatchString(page.URL) {
this.doURL(http.MethodGet, page.URL, "", page.NewStatus)
return true
} else {
file := Tea.Root + Tea.DS + page.URL
fp, err := os.Open(file)
if err != nil {
logs.Error(err)
msg := "404 page not found: '" + page.URL + "'"
if len(page.BodyType) == 0 || page.BodyType == shared.BodyTypeURL {
if urlPrefixRegexp.MatchString(page.URL) {
this.doURL(http.MethodGet, page.URL, "", page.NewStatus, true)
return true
} else {
file := Tea.Root + Tea.DS + page.URL
fp, err := os.Open(file)
if err != nil {
logs.Error(err)
msg := "404 page not found: '" + page.URL + "'"
this.writer.WriteHeader(http.StatusNotFound)
_, err := this.writer.Write([]byte(msg))
this.writer.WriteHeader(http.StatusNotFound)
_, err := this.writer.Write([]byte(msg))
if err != nil {
logs.Error(err)
}
return true
}
// 修改状态码
if page.NewStatus > 0 {
// 自定义响应Headers
this.processResponseHeaders(page.NewStatus)
this.writer.WriteHeader(page.NewStatus)
} else {
this.processResponseHeaders(status)
this.writer.WriteHeader(status)
}
buf := bytePool1k.Get()
_, err = utils.CopyWithFilter(this.writer, fp, buf, func(p []byte) []byte {
return []byte(this.Format(string(p)))
})
bytePool1k.Put(buf)
if err != nil {
if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_PAGE", "write to client failed: "+err.Error())
}
} else {
this.writer.SetOk()
}
err = fp.Close()
if err != nil {
logs.Error(err)
}
return true
}
return true
} else if page.BodyType == shared.BodyTypeHTML {
// 修改状态码
if page.NewStatus > 0 {
// 自定义响应Headers
@@ -47,9 +78,8 @@ func (this *HTTPRequest) doPage(status int) (shouldStop bool) {
this.processResponseHeaders(status)
this.writer.WriteHeader(status)
}
buf := bytePool1k.Get()
_, err = io.CopyBuffer(this.writer, fp, buf)
bytePool1k.Put(buf)
_, err := this.writer.WriteString(this.Format(page.Body))
if err != nil {
if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_PAGE", "write to client failed: "+err.Error())
@@ -57,13 +87,8 @@ func (this *HTTPRequest) doPage(status int) (shouldStop bool) {
} else {
this.writer.SetOk()
}
err = fp.Close()
if err != nil {
logs.Error(err)
}
return true
}
return true
}
}
return false

View File

@@ -33,7 +33,11 @@ func (this *HTTPRequest) doReverseProxy() {
// 源站
requestCall := shared.NewRequestCall()
requestCall.Request = this.RawReq
requestCall.Formatter = this.Format
requestCall.Domain = this.Host
origin := this.reverseProxy.NextOrigin(requestCall)
requestCall.CallResponseCallbacks(this.writer)
if origin == nil {
err := errors.New(this.requestPath() + ": no available backends for reverse proxy")
remotelogs.Error("HTTP_REQUEST_REVERSE_PROXY", err.Error())

View File

@@ -19,7 +19,7 @@ func (this *HTTPRequest) doRewrite() (shouldShop bool) {
if len(this.rewriteRule.ProxyHost) > 0 {
host = this.rewriteRule.ProxyHost
}
this.doURL(this.RawReq.Method, this.rewriteReplace, host, 0)
this.doURL(this.RawReq.Method, this.rewriteReplace, host, 0, false)
return true
}

View File

@@ -1,10 +1,11 @@
package nodes
import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/logs"
"io"
"net/http"
"os"
)
@@ -17,12 +18,44 @@ func (this *HTTPRequest) doShutdown() {
}
if urlPrefixRegexp.MatchString(shutdown.URL) { // URL
this.doURL(http.MethodGet, shutdown.URL, "", shutdown.Status)
this.doURL(http.MethodGet, shutdown.URL, "", shutdown.Status, true)
return
}
// URL为空则显示文本 TODO 未来可以自定义文本
if len(shutdown.URL) == 0 {
if len(shutdown.BodyType) == 0 || shutdown.BodyType == shared.BodyTypeURL {
// URL为空则显示文本
if len(shutdown.URL) == 0 {
// 自定义响应Headers
if shutdown.Status > 0 {
this.processResponseHeaders(shutdown.Status)
this.writer.WriteHeader(shutdown.Status)
} else {
this.processResponseHeaders(http.StatusOK)
this.writer.WriteHeader(http.StatusOK)
}
_, err := this.writer.WriteString("The site have been shutdown.")
if err != nil {
logs.Error(err)
}
return
}
// 从本地文件中读取
file := Tea.Root + Tea.DS + shutdown.URL
fp, err := os.Open(file)
if err != nil {
logs.Error(err)
msg := "404 page not found: '" + shutdown.URL + "'"
this.writer.WriteHeader(http.StatusNotFound)
_, err = this.writer.Write([]byte(msg))
if err != nil {
logs.Error(err)
}
return
}
// 自定义响应Headers
if shutdown.Status > 0 {
this.processResponseHeaders(shutdown.Status)
@@ -31,51 +64,40 @@ func (this *HTTPRequest) doShutdown() {
this.processResponseHeaders(http.StatusOK)
this.writer.WriteHeader(http.StatusOK)
}
_, err := this.writer.WriteString("The site have been shutdown.")
buf := bytePool1k.Get()
_, err = utils.CopyWithFilter(this.writer, fp, buf, func(p []byte) []byte {
return []byte(this.Format(string(p)))
})
bytePool1k.Put(buf)
if err != nil {
logs.Error(err)
if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_SHUTDOWN", "write to client failed: "+err.Error())
}
} else {
this.writer.SetOk()
}
return
}
// 从本地文件中读取
// TODO 支持从数据库中读取文件
file := Tea.Root + Tea.DS + shutdown.URL
fp, err := os.Open(file)
if err != nil {
logs.Error(err)
msg := "404 page not found: '" + shutdown.URL + "'"
this.writer.WriteHeader(http.StatusNotFound)
_, err = this.writer.Write([]byte(msg))
err = fp.Close()
if err != nil {
logs.Error(err)
remotelogs.Warn("HTTP_REQUEST_SHUTDOWN", "close file failed: "+err.Error())
}
return
}
// 自定义响应Headers
if shutdown.Status > 0 {
this.processResponseHeaders(shutdown.Status)
this.writer.WriteHeader(shutdown.Status)
} else {
this.processResponseHeaders(http.StatusOK)
this.writer.WriteHeader(http.StatusOK)
}
buf := bytePool1k.Get()
_, err = io.CopyBuffer(this.writer, fp, buf)
bytePool1k.Put(buf)
if err != nil {
if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_SHUTDOWN", "write to client failed: "+err.Error())
} else if shutdown.BodyType == shared.BodyTypeHTML {
// 自定义响应Headers
if shutdown.Status > 0 {
this.processResponseHeaders(shutdown.Status)
this.writer.WriteHeader(shutdown.Status)
} else {
this.processResponseHeaders(http.StatusOK)
this.writer.WriteHeader(http.StatusOK)
}
} else {
this.writer.SetOk()
}
err = fp.Close()
if err != nil {
remotelogs.Warn("HTTP_REQUEST_SHUTDOWN", "close file failed: "+err.Error())
_, err := this.writer.WriteString(this.Format(shutdown.Body))
if err != nil {
if !this.canIgnore(err) {
remotelogs.Warn("HTTP_REQUEST_SHUTDOWN", "write to client failed: "+err.Error())
}
} else {
this.writer.SetOk()
}
}
}

View File

@@ -9,6 +9,6 @@ func (this *HTTPRequest) doStat() {
}
// 内置的统计
stats.SharedHTTPRequestStatManager.AddRemoteAddr(this.Server.Id, this.requestRemoteAddr())
stats.SharedHTTPRequestStatManager.AddRemoteAddr(this.Server.Id, this.requestRemoteAddr(true))
stats.SharedHTTPRequestStatManager.AddUserAgent(this.Server.Id, this.requestHeader("User-Agent"))
}

View File

@@ -1,7 +1,6 @@
package nodes
import (
"errors"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/logs"
@@ -11,7 +10,7 @@ import (
)
// 请求某个URL
func (this *HTTPRequest) doURL(method string, url string, host string, statusCode int) {
func (this *HTTPRequest) doURL(method string, url string, host string, statusCode int, supportVariables bool) {
req, err := http.NewRequest(method, url, this.RawReq.Body)
if err != nil {
logs.Error(err)
@@ -35,7 +34,7 @@ func (this *HTTPRequest) doURL(method string, url string, host string, statusCod
var client = utils.SharedHttpClient(60 * time.Second)
resp, err := client.Do(req)
if err != nil {
logs.Error(errors.New(req.URL.String() + ": " + err.Error()))
remotelogs.Error("HTTP_REQUEST_URL", req.URL.String()+": "+err.Error())
this.write50x(err, http.StatusInternalServerError)
return
}
@@ -50,6 +49,9 @@ func (this *HTTPRequest) doURL(method string, url string, host string, statusCod
this.processResponseHeaders(statusCode)
}
if supportVariables {
resp.Header.Del("Content-Length")
}
this.writer.AddHeaders(resp.Header)
if statusCode <= 0 {
this.writer.Prepare(resp.ContentLength, resp.StatusCode)
@@ -67,7 +69,13 @@ func (this *HTTPRequest) doURL(method string, url string, host string, statusCod
// 输出内容
pool := this.bytePool(resp.ContentLength)
buf := pool.Get()
_, err = io.CopyBuffer(this.writer, resp.Body, buf)
if supportVariables {
_, err = utils.CopyWithFilter(this.writer, resp.Body, buf, func(p []byte) []byte {
return []byte(this.Format(string(p)))
})
} else {
_, err = io.CopyBuffer(this.writer, resp.Body, buf)
}
pool.Put(buf)
if err != nil {

View File

@@ -11,11 +11,21 @@ import (
"github.com/iwind/TeaGo/types"
"io"
"io/ioutil"
"net"
"net/http"
)
// 调用WAF
func (this *HTTPRequest) doWAFRequest() (blocked bool) {
// 当前连接是否已关闭
var conn = this.RawReq.Context().Value(HTTPConnContextKey)
if conn != nil {
trafficConn, ok := conn.(*TrafficConn)
if ok && trafficConn.IsClosed() {
return true
}
}
// 当前服务的独立设置
if this.web.FirewallPolicy != nil && this.web.FirewallPolicy.IsOn {
blocked, breakChecking := this.checkWAFRequest(this.web.FirewallPolicy)
@@ -43,7 +53,7 @@ func (this *HTTPRequest) doWAFRequest() (blocked bool) {
func (this *HTTPRequest) checkWAFRequest(firewallPolicy *firewallconfigs.HTTPFirewallPolicy) (blocked bool, breakChecking bool) {
// 检查配置是否为空
if firewallPolicy == nil || !firewallPolicy.IsOn || firewallPolicy.Inbound == nil || !firewallPolicy.Inbound.IsOn {
if firewallPolicy == nil || !firewallPolicy.IsOn || firewallPolicy.Inbound == nil || !firewallPolicy.Inbound.IsOn || firewallPolicy.Mode == firewallconfigs.FirewallModeBypass {
return
}
@@ -57,7 +67,7 @@ func (this *HTTPRequest) checkWAFRequest(firewallPolicy *firewallconfigs.HTTPFir
if ref.IsOn && ref.ListId > 0 {
list := iplibrary.SharedIPListManager.FindList(ref.ListId)
if list != nil {
found, _ := list.ContainsIPStrings(remoteAddrs)
_, found := list.ContainsIPStrings(remoteAddrs)
if found {
breakChecking = true
return
@@ -67,81 +77,85 @@ func (this *HTTPRequest) checkWAFRequest(firewallPolicy *firewallconfigs.HTTPFir
}
// 检查IP黑名单
for _, ref := range inbound.AllDenyListRefs() {
if ref.IsOn && ref.ListId > 0 {
list := iplibrary.SharedIPListManager.FindList(ref.ListId)
if list != nil {
found, item := list.ContainsIPStrings(remoteAddrs)
if found {
// 触发事件
if item != nil && len(item.EventLevel) > 0 {
actions := iplibrary.SharedActionManager.FindEventActions(item.EventLevel)
for _, action := range actions {
goNext, err := action.DoHTTP(this.RawReq, this.RawWriter)
if err != nil {
remotelogs.Error("HTTP_REQUEST_WAF", "do action '"+err.Error()+"' failed: "+err.Error())
return true, false
}
if !goNext {
this.disableLog = true
return true, false
if firewallPolicy.Mode == firewallconfigs.FirewallModeDefend {
for _, ref := range inbound.AllDenyListRefs() {
if ref.IsOn && ref.ListId > 0 {
list := iplibrary.SharedIPListManager.FindList(ref.ListId)
if list != nil {
item, found := list.ContainsIPStrings(remoteAddrs)
if found {
// 触发事件
if item != nil && len(item.EventLevel) > 0 {
actions := iplibrary.SharedActionManager.FindEventActions(item.EventLevel)
for _, action := range actions {
goNext, err := action.DoHTTP(this.RawReq, this.RawWriter)
if err != nil {
remotelogs.Error("HTTP_REQUEST_WAF", "do action '"+err.Error()+"' failed: "+err.Error())
return true, false
}
if !goNext {
this.disableLog = true
return true, false
}
}
}
// TODO 需要记录日志信息
this.writer.WriteHeader(http.StatusForbidden)
this.writer.Close()
// 停止日志
this.disableLog = true
return true, false
}
// TODO 需要记录日志信息
this.writer.WriteHeader(http.StatusForbidden)
this.writer.Close()
// 停止日志
this.disableLog = true
return true, false
}
}
}
}
// 检查地区封禁
if iplibrary.SharedLibrary != nil {
if firewallPolicy.Inbound.Region != nil && firewallPolicy.Inbound.Region.IsOn {
regionConfig := firewallPolicy.Inbound.Region
if regionConfig.IsNotEmpty() {
for _, remoteAddr := range remoteAddrs {
result, err := iplibrary.SharedLibrary.Lookup(remoteAddr)
if err != nil {
remotelogs.Error("HTTP_REQUEST_WAF", "iplibrary lookup failed: "+err.Error())
} else if result != nil {
// 检查国家级别封禁
if len(regionConfig.DenyCountryIds) > 0 && len(result.Country) > 0 {
countryId := iplibrary.SharedCountryManager.Lookup(result.Country)
if countryId > 0 && lists.ContainsInt64(regionConfig.DenyCountryIds, countryId) {
// TODO 可以配置对封禁的处理方式等
// TODO 需要记录日志信息
this.writer.WriteHeader(http.StatusForbidden)
this.writer.Close()
if firewallPolicy.Mode == firewallconfigs.FirewallModeDefend {
if iplibrary.SharedLibrary != nil {
if firewallPolicy.Inbound.Region != nil && firewallPolicy.Inbound.Region.IsOn {
regionConfig := firewallPolicy.Inbound.Region
if regionConfig.IsNotEmpty() {
for _, remoteAddr := range remoteAddrs {
result, err := iplibrary.SharedLibrary.Lookup(remoteAddr)
if err != nil {
remotelogs.Error("HTTP_REQUEST_WAF", "iplibrary lookup failed: "+err.Error())
} else if result != nil {
// 检查国家级别封禁
if len(regionConfig.DenyCountryIds) > 0 && len(result.Country) > 0 {
countryId := iplibrary.SharedCountryManager.Lookup(result.Country)
if countryId > 0 && lists.ContainsInt64(regionConfig.DenyCountryIds, countryId) {
// TODO 可以配置对封禁的处理方式等
// TODO 需要记录日志信息
this.writer.WriteHeader(http.StatusForbidden)
this.writer.Close()
// 停止日志
this.disableLog = true
// 停止日志
this.disableLog = true
return true, false
return true, false
}
}
}
// 检查省份封禁
if len(regionConfig.DenyProvinceIds) > 0 && len(result.Province) > 0 {
provinceId := iplibrary.SharedProvinceManager.Lookup(result.Province)
if provinceId > 0 && lists.ContainsInt64(regionConfig.DenyProvinceIds, provinceId) {
// TODO 可以配置对封禁的处理方式等
// TODO 需要记录日志信息
this.writer.WriteHeader(http.StatusForbidden)
this.writer.Close()
// 检查省份封禁
if len(regionConfig.DenyProvinceIds) > 0 && len(result.Province) > 0 {
provinceId := iplibrary.SharedProvinceManager.Lookup(result.Province)
if provinceId > 0 && lists.ContainsInt64(regionConfig.DenyProvinceIds, provinceId) {
// TODO 可以配置对封禁的处理方式等
// TODO 需要记录日志信息
this.writer.WriteHeader(http.StatusForbidden)
this.writer.Close()
// 停止日志
this.disableLog = true
// 停止日志
this.disableLog = true
return true, false
return true, false
}
}
}
}
@@ -184,7 +198,7 @@ func (this *HTTPRequest) checkWAFRequest(firewallPolicy *firewallconfigs.HTTPFir
stats.SharedHTTPRequestStatManager.AddFirewallRuleGroupId(this.Server.Id, this.firewallRuleGroupId, ruleSet.Actions)
}
this.firewallActions = ruleSet.ActionCodes()
this.firewallActions = append(ruleSet.ActionCodes(), firewallPolicy.Mode)
}
return !goNext, false
@@ -211,7 +225,7 @@ func (this *HTTPRequest) doWAFResponse(resp *http.Response) (blocked bool) {
}
func (this *HTTPRequest) checkWAFResponse(firewallPolicy *firewallconfigs.HTTPFirewallPolicy, resp *http.Response) (blocked bool) {
if firewallPolicy == nil || !firewallPolicy.IsOn || !firewallPolicy.Outbound.IsOn {
if firewallPolicy == nil || !firewallPolicy.IsOn || !firewallPolicy.Outbound.IsOn || firewallPolicy.Mode == firewallconfigs.FirewallModeBypass {
return
}
@@ -248,7 +262,7 @@ func (this *HTTPRequest) checkWAFResponse(firewallPolicy *firewallconfigs.HTTPFi
stats.SharedHTTPRequestStatManager.AddFirewallRuleGroupId(this.Server.Id, this.firewallRuleGroupId, ruleSet.Actions)
}
this.firewallActions = ruleSet.ActionCodes()
this.firewallActions = append(ruleSet.ActionCodes(), firewallPolicy.Mode)
}
return !goNext
@@ -261,7 +275,7 @@ func (this *HTTPRequest) WAFRaw() *http.Request {
// WAFRemoteIP 客户端IP
func (this *HTTPRequest) WAFRemoteIP() string {
return this.requestRemoteAddr()
return this.requestRemoteAddr(true)
}
// WAFGetCacheBody 获取缓存中的Body
@@ -296,3 +310,17 @@ func (this *HTTPRequest) WAFRestoreBody(data []byte) {
func (this *HTTPRequest) WAFServerId() int64 {
return this.Server.Id
}
// WAFClose 关闭连接
func (this *HTTPRequest) WAFClose() {
requestConn := this.RawReq.Context().Value(HTTPConnContextKey)
if requestConn == nil {
return
}
conn, ok := requestConn.(net.Conn)
if ok {
_ = conn.Close()
return
}
return
}

View File

@@ -3,33 +3,57 @@ package nodes
import (
"bufio"
"bytes"
"compress/gzip"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/caches"
"github.com/TeaOSLab/EdgeNode/internal/compressions"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/chai2010/webp"
"github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/types"
_ "golang.org/x/image/bmp"
_ "golang.org/x/image/webp"
"image"
_ "image/gif"
_ "image/jpeg"
_ "image/png"
"io"
"net"
"net/http"
"path/filepath"
"strings"
"sync/atomic"
)
// 限制WebP能够同时使用的Buffer内存使用量
const webpMaxBufferSize int64 = 1_000_000_000
const webpSuffix = "@GOEDGE_WEBP"
var webpTotalBufferSize int64 = 0
var webpBufferPool = utils.NewBufferPool(1024)
// HTTPWriter 响应Writer
type HTTPWriter struct {
req *HTTPRequest
writer http.ResponseWriter
gzipConfig *serverconfigs.HTTPGzipConfig
gzipWriter *gzip.Writer
size int64
webpIsEncoding bool
webpBuffer *bytes.Buffer
webpIsWriting bool
compressionConfig *serverconfigs.HTTPCompressionConfig
compressionWriter compressions.Writer
compressionType serverconfigs.HTTPCompressionType
statusCode int
sentBodyBytes int64
bodyCopying bool
body []byte
gzipBodyBuffer *bytes.Buffer // 当使用gzip压缩时使用
gzipBodyWriter *gzip.Writer // 当使用gzip压缩时使用
bodyCopying bool
body []byte
compressionBodyBuffer *bytes.Buffer // 当使用压缩时使用
compressionBodyWriter compressions.Writer // 当使用压缩时使用
cacheWriter caches.Writer // 缓存写入
cacheStorage caches.StorageInterface
@@ -49,29 +73,39 @@ func NewHTTPWriter(req *HTTPRequest, httpResponseWriter http.ResponseWriter) *HT
func (this *HTTPWriter) Reset(httpResponseWriter http.ResponseWriter) {
this.writer = httpResponseWriter
this.gzipConfig = nil
this.gzipWriter = nil
this.compressionConfig = nil
this.compressionWriter = nil
this.statusCode = 0
this.sentBodyBytes = 0
this.bodyCopying = false
this.body = nil
this.gzipBodyBuffer = nil
this.gzipBodyWriter = nil
this.compressionBodyBuffer = nil
this.compressionBodyWriter = nil
}
// Gzip 设置Gzip
func (this *HTTPWriter) Gzip(config *serverconfigs.HTTPGzipConfig) {
this.gzipConfig = config
// SetCompression 设置内容压缩配置
func (this *HTTPWriter) SetCompression(config *serverconfigs.HTTPCompressionConfig) {
this.compressionConfig = config
}
// Prepare 准备输出
// 缓存不调用此函数
func (this *HTTPWriter) Prepare(size int64, status int) {
this.size = size
this.statusCode = status
this.prepareGzip(size)
if status == http.StatusOK {
this.prepareWebP(size)
}
this.prepareCache(size)
// 在WebP模式下压缩暂不可用
if !this.webpIsEncoding {
this.PrepareCompression(size)
}
}
// Raw 包装前的原始的Writer
@@ -104,40 +138,46 @@ func (this *HTTPWriter) AddHeaders(header http.Header) {
// Write 写入数据
func (this *HTTPWriter) Write(data []byte) (n int, err error) {
if this.writer != nil {
if this.gzipWriter != nil {
n, err = this.gzipWriter.Write(data)
} else {
n, err = this.writer.Write(data)
}
if n > 0 {
this.sentBodyBytes += int64(n)
}
n = len(data)
// 写入缓存
if this.cacheWriter != nil {
_, err = this.cacheWriter.Write(data)
if err != nil {
_ = this.cacheWriter.Discard()
this.cacheWriter = nil
remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error())
}
}
} else {
if n == 0 {
n = len(data) // 防止出现short write错误
}
}
if this.bodyCopying {
if this.gzipBodyWriter != nil {
_, err := this.gzipBodyWriter.Write(data)
if err != nil {
remotelogs.Error("HTTP_WRITER", err.Error())
}
if this.writer != nil {
if this.webpIsEncoding && !this.webpIsWriting {
this.webpBuffer.Write(data)
} else {
this.body = append(this.body, data...)
// 写入压缩
var n1 int
if this.compressionWriter != nil {
n1, err = this.compressionWriter.Write(data)
} else {
n1, err = this.writer.Write(data)
}
if n1 > 0 {
this.sentBodyBytes += int64(n1)
}
// 写入缓存
if this.cacheWriter != nil {
_, err = this.cacheWriter.Write(data)
if err != nil {
_ = this.cacheWriter.Discard()
this.cacheWriter = nil
remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error())
}
}
if this.bodyCopying {
if this.compressionBodyWriter != nil {
_, err := this.compressionBodyWriter.Write(data)
if err != nil {
remotelogs.Error("HTTP_WRITER", err.Error())
}
} else {
this.body = append(this.body, data...)
}
}
}
}
return
}
@@ -211,14 +251,64 @@ func (this *HTTPWriter) SetOk() {
// Close 关闭
func (this *HTTPWriter) Close() {
// gzip writer
if this.gzipWriter != nil {
if this.bodyCopying && this.gzipBodyWriter != nil {
_ = this.gzipBodyWriter.Close()
this.body = this.gzipBodyBuffer.Bytes()
if this.webpIsEncoding {
defer func() {
atomic.AddInt64(&webpTotalBufferSize, -this.size*32)
webpBufferPool.Put(this.webpBuffer)
}()
}
// webp writer
if this.isOk && this.webpIsEncoding {
var bufferLen = int64(this.webpBuffer.Len())
atomic.AddInt64(&webpTotalBufferSize, bufferLen*8)
imageData, _, err := image.Decode(this.webpBuffer)
if err != nil {
_, _ = io.Copy(this.writer, this.webpBuffer)
// 处理缓存
if this.cacheWriter != nil {
_ = this.cacheWriter.Discard()
}
this.cacheWriter = nil
} else {
var f = types.Float32(this.req.web.WebP.Quality)
if f > 100 {
f = 100
}
this.webpIsWriting = true
err = webp.Encode(this, imageData, &webp.Options{
Lossless: false,
Quality: f,
Exact: true,
})
if err != nil {
if !this.req.canIgnore(err) {
remotelogs.Error("HTTP_WRITER", "encode webp failed: "+err.Error())
}
// 处理缓存
if this.cacheWriter != nil {
_ = this.cacheWriter.Discard()
}
this.cacheWriter = nil
}
}
_ = this.gzipWriter.Close()
this.gzipWriter = nil
atomic.AddInt64(&webpTotalBufferSize, -bufferLen*8)
this.webpBuffer.Reset()
}
// compression writer
if this.compressionWriter != nil {
if this.bodyCopying && this.compressionBodyWriter != nil {
_ = this.compressionBodyWriter.Close()
this.body = this.compressionBodyBuffer.Bytes()
}
_ = this.compressionWriter.Close()
this.compressionWriter = nil
}
// cache writer
@@ -271,35 +361,29 @@ func (this *HTTPWriter) Flush() {
}
}
// 准备Gzip
func (this *HTTPWriter) prepareGzip(size int64) {
if this.gzipConfig == nil || this.gzipConfig.Level <= 0 {
return
}
// 准备Webp
func (this *HTTPWriter) prepareWebP(size int64) {
if this.req.web != nil &&
this.req.web.WebP != nil &&
this.req.web.WebP.IsOn &&
this.req.web.WebP.MatchResponse(this.Header().Get("Content-Type"), size, filepath.Ext(this.req.requestPath()), this.req.Format) &&
this.req.web.WebP.MatchAccept(this.req.requestHeader("Accept")) &&
len(this.writer.Header().Get("Content-Encoding")) == 0 &&
atomic.LoadInt64(&webpTotalBufferSize) < webpMaxBufferSize {
this.webpIsEncoding = true
this.webpBuffer = webpBufferPool.Get()
// 判断Accept是否支持gzip
if !strings.Contains(this.req.requestHeader("Accept-Encoding"), "gzip") {
return
}
this.Header().Del("Content-Length")
this.Header().Set("Content-Type", "image/webp")
// 尺寸和类型
if size < this.gzipConfig.MinBytes() || (this.gzipConfig.MaxBytes() > 0 && size > this.gzipConfig.MaxBytes()) {
return
atomic.AddInt64(&webpTotalBufferSize, size*32)
}
}
// 校验其他条件
if this.gzipConfig.Conds != nil {
if len(this.gzipConfig.Conds.Groups) > 0 {
if !this.gzipConfig.Conds.MatchRequest(this.req.Format) || !this.gzipConfig.Conds.MatchResponse(this.req.Format) {
return
}
} else {
// 默认校验文档类型
contentType := this.writer.Header().Get("Content-Type")
if len(contentType) > 0 && (!strings.HasPrefix(contentType, "text/") && !strings.HasPrefix(contentType, "application/")) {
return
}
}
// PrepareCompression 准备压缩
func (this *HTTPWriter) PrepareCompression(size int64) {
if this.compressionConfig == nil || !this.compressionConfig.IsOn || this.compressionConfig.Level <= 0 {
return
}
// 如果已经有编码则不处理
@@ -307,9 +391,21 @@ func (this *HTTPWriter) prepareGzip(size int64) {
return
}
// gzip writer
// 尺寸和类型
if !this.compressionConfig.MatchResponse(this.Header().Get("Content-Type"), size, filepath.Ext(this.req.requestPath()), this.req.Format) {
return
}
// 判断Accept是否支持压缩
compressionType, compressionEncoding, ok := this.compressionConfig.MatchAcceptEncoding(this.req.RawReq.Header.Get("Accept-Encoding"))
if !ok {
return
}
this.compressionType = compressionType
// compression writer
var err error = nil
this.gzipWriter, err = gzip.NewWriterLevel(this.writer, int(this.gzipConfig.Level))
this.compressionWriter, err = compressions.NewWriter(this.writer, compressionType, int(this.compressionConfig.Level))
if err != nil {
remotelogs.Error("HTTP_WRITER", err.Error())
return
@@ -317,16 +413,15 @@ func (this *HTTPWriter) prepareGzip(size int64) {
// body copy
if this.bodyCopying {
this.gzipBodyBuffer = bytes.NewBuffer([]byte{})
this.gzipBodyWriter, err = gzip.NewWriterLevel(this.gzipBodyBuffer, int(this.gzipConfig.Level))
this.compressionBodyBuffer = bytes.NewBuffer([]byte{})
this.compressionBodyWriter, err = compressions.NewWriter(this.compressionBodyBuffer, compressionType, int(this.compressionConfig.Level))
if err != nil {
remotelogs.Error("HTTP_WRITER", err.Error())
}
}
header := this.writer.Header()
header.Set("Content-Encoding", "gzip")
header.Set("Transfer-Encoding", "chunked")
header.Set("Content-Encoding", compressionEncoding)
header.Set("Vary", "Accept-Encoding")
header.Del("Content-Length")
}
@@ -357,7 +452,7 @@ func (this *HTTPWriter) prepareCache(size int64) {
return
}
if size >= 0 && ((cacheRef.MaxSizeBytes() > 0 && size > cacheRef.MaxSizeBytes()) ||
(cachePolicy.MaxSizeBytes() > 0 && size > cachePolicy.MaxSizeBytes())) {
(cachePolicy.MaxSizeBytes() > 0 && size > cachePolicy.MaxSizeBytes()) || (cacheRef.MinSizeBytes() > size)) {
return
}
@@ -400,7 +495,11 @@ func (this *HTTPWriter) prepareCache(size int64) {
life = 60
}
expiredAt := utils.UnixTime() + life
cacheWriter, err := storage.OpenWriter(this.req.cacheKey, expiredAt, this.StatusCode())
var cacheKey = this.req.cacheKey
if this.webpIsEncoding {
cacheKey += webpSuffix
}
cacheWriter, err := storage.OpenWriter(cacheKey, expiredAt, this.StatusCode())
if err != nil {
if !caches.CanIgnoreErr(err) {
remotelogs.Error("HTTP_WRITER", "write cache failed: "+err.Error())
@@ -408,9 +507,6 @@ func (this *HTTPWriter) prepareCache(size int64) {
return
}
this.cacheWriter = cacheWriter
if this.gzipWriter != nil {
this.cacheWriter = caches.NewGzipWriter(this.cacheWriter, this.req.cacheKey, expiredAt)
}
// 写入Header
for k, v := range this.Header() {

View File

@@ -11,7 +11,7 @@ import (
)
type Listener struct {
group *serverconfigs.ServerGroup
group *serverconfigs.ServerAddressGroup
isListening bool
listener ListenerInterface // 监听器
@@ -22,7 +22,7 @@ func NewListener() *Listener {
return &Listener{}
}
func (this *Listener) Reload(group *serverconfigs.ServerGroup) {
func (this *Listener) Reload(group *serverconfigs.ServerAddressGroup) {
this.locker.Lock()
this.group = group
if this.listener != nil {

View File

@@ -16,7 +16,7 @@ type BaseListener struct {
namedServersLocker sync.RWMutex
namedServers map[string]*NamedServer // 域名 => server
Group *serverconfigs.ServerGroup
Group *serverconfigs.ServerAddressGroup
countActiveConnections int64 // 当前活跃的连接数
}

View File

@@ -1,6 +1,7 @@
package nodes
import (
"context"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"golang.org/x/net/http2"
@@ -18,6 +19,12 @@ var httpErrorLogger = log.New(io.Discard, "", 0)
var metricNewConnMap = map[string]bool{} // remoteAddr => bool
var metricNewConnMapLocker = &sync.Mutex{}
type contextKey struct {
key string
}
var HTTPConnContextKey = &contextKey{key: "http-conn"}
type HTTPListener struct {
BaseListener
@@ -65,6 +72,9 @@ func (this *HTTPListener) Serve() error {
metricNewConnMapLocker.Unlock()
}
},
ConnContext: func(ctx context.Context, c net.Conn) context.Context {
return context.WithValue(ctx, HTTPConnContextKey, c)
},
}
this.httpServer.SetKeepAlivesEnabled(true)
@@ -102,7 +112,7 @@ func (this *HTTPListener) Close() error {
return this.Listener.Close()
}
func (this *HTTPListener) Reload(group *serverconfigs.ServerGroup) {
func (this *HTTPListener) Reload(group *serverconfigs.ServerAddressGroup) {
this.Group = group
this.Reset()

View File

@@ -14,7 +14,7 @@ type ListenerInterface interface {
Close() error
// Reload 重载配置
Reload(serverGroup *serverconfigs.ServerGroup)
Reload(serverGroup *serverconfigs.ServerAddressGroup)
// CountActiveListeners 获取当前活跃的连接数
CountActiveListeners() int

View File

@@ -70,7 +70,7 @@ func (this *ListenerManager) Start(node *nodeconfigs.NodeConfig) error {
groupAddrs := []string{}
availableServerGroups := node.AvailableGroups()
if !node.IsOn {
availableServerGroups = []*serverconfigs.ServerGroup{}
availableServerGroups = []*serverconfigs.ServerAddressGroup{}
}
if len(availableServerGroups) == 0 {

View File

@@ -42,7 +42,7 @@ func (this *TCPListener) Serve() error {
return nil
}
func (this *TCPListener) Reload(group *serverconfigs.ServerGroup) {
func (this *TCPListener) Reload(group *serverconfigs.ServerAddressGroup) {
this.Group = group
this.Reset()
}
@@ -55,6 +55,24 @@ func (this *TCPListener) handleConn(conn net.Conn) error {
if firstServer.ReverseProxy == nil {
return errors.New("no ReverseProxy configured for the server")
}
// 记录域名排行
tlsConn, ok := conn.(*tls.Conn)
var recordStat = false
if ok {
var serverName = tlsConn.ConnectionState().ServerName
if len(serverName) > 0 {
// 统计
stats.SharedTrafficStatManager.Add(firstServer.Id, serverName, 0, 0, 1, 0, 0, 0)
recordStat = true
}
}
// 统计
if !recordStat {
stats.SharedTrafficStatManager.Add(firstServer.Id, "", 0, 0, 1, 0, 0, 0)
}
originConn, err := this.connectOrigin(firstServer.ReverseProxy, conn.RemoteAddr().String())
if err != nil {
return err

View File

@@ -84,7 +84,7 @@ func (this *UDPListener) Close() error {
return this.Listener.Close()
}
func (this *UDPListener) Reload(group *serverconfigs.ServerGroup) {
func (this *UDPListener) Reload(group *serverconfigs.ServerAddressGroup) {
this.Group = group
this.Reset()
}
@@ -147,6 +147,10 @@ func NewUDPConn(serverId int64, addr net.Addr, proxyConn *net.UDPConn, serverCon
activatedAt: time.Now().Unix(),
isOk: true,
}
// 统计
stats.SharedTrafficStatManager.Add(serverId, "", 0, 0, 1, 0, 0, 0)
go func() {
buffer := bytePool32k.Get()
defer func() {

View File

@@ -22,7 +22,7 @@ func (this *UnixListener) Close() error {
return nil
}
func (this *UnixListener) Reload(group *serverconfigs.ServerGroup) {
func (this *UnixListener) Reload(group *serverconfigs.ServerAddressGroup) {
this.Group = group
this.Reset()
}

View File

@@ -27,11 +27,14 @@ import (
"os"
"os/exec"
"runtime"
"sync"
"time"
)
var sharedNodeConfig *nodeconfigs.NodeConfig
var nodeTaskNotify = make(chan bool, 8)
var nodeConfigChangedNotify = make(chan bool, 8)
var nodeConfigUpdatedAt int64
var DaemonIsOn = false
var DaemonPid = 0
@@ -39,6 +42,7 @@ var DaemonPid = 0
type Node struct {
isLoaded bool
sock *gosock.Sock
locker sync.Mutex
}
func NewNode() *Node {
@@ -280,6 +284,9 @@ func (this *Node) loop() error {
// 读取API配置
func (this *Node) syncConfig() error {
this.locker.Lock()
defer this.locker.Unlock()
// 检查api.yaml是否存在
apiConfigFile := Tea.ConfigFile("api.yaml")
_, err := os.Stat(apiConfigFile)
@@ -290,7 +297,7 @@ func (this *Node) syncConfig() error {
if os.IsNotExist(clusterErr) {
return err
}
return clusterErr
return errors.New("check cluster config failed: " + clusterErr.Error())
}
} else {
return err
@@ -315,6 +322,7 @@ func (this *Node) syncConfig() error {
if !configResp.IsChanged {
return nil
}
nodeConfigUpdatedAt = time.Now().Unix()
configJSON := configResp.NodeJSON
nodeConfig := &nodeconfigs.NodeConfig{}
@@ -347,7 +355,7 @@ func (this *Node) syncConfig() error {
} else {
remotelogs.Println("NODE", "loading config ...")
}
nodeconfigs.ResetNodeConfig(nodeConfig)
caches.SharedManager.MaxDiskCapacity = nodeConfig.MaxCacheDiskCapacity
caches.SharedManager.MaxMemoryCapacity = nodeConfig.MaxCacheMemoryCapacity
@@ -398,6 +406,12 @@ func (this *Node) startSyncTimer() {
remotelogs.Error("NODE", "sync config error: "+err.Error())
continue
}
case <-nodeConfigChangedNotify:
err := this.syncConfig()
if err != nil {
remotelogs.Error("NODE", "sync config error: "+err.Error())
continue
}
}
}
}()
@@ -425,7 +439,7 @@ func (this *Node) checkClusterConfig() error {
return err
}
logs.Println("[NODE]registering node ...")
logs.Println("[NODE]registering node to cluster ...")
resp, err := rpcClient.NodeRPC().RegisterClusterNode(rpcClient.ClusterContext(config.ClusterId, config.Secret), &pb.RegisterClusterNodeRequest{Name: HOSTNAME})
if err != nil {
return err

View File

@@ -1,3 +1,4 @@
//go:build !windows
// +build !windows
package nodes

View File

@@ -44,7 +44,8 @@ func init() {
// TrafficConn 用于统计流量的连接
type TrafficConn struct {
rawConn net.Conn
rawConn net.Conn
isClosed bool
}
func NewTrafficConn(conn net.Conn) net.Conn {
@@ -68,6 +69,7 @@ func (this *TrafficConn) Write(b []byte) (n int, err error) {
}
func (this *TrafficConn) Close() error {
this.isClosed = true
return this.rawConn.Close()
}
@@ -90,3 +92,7 @@ func (this *TrafficConn) SetReadDeadline(t time.Time) error {
func (this *TrafficConn) SetWriteDeadline(t time.Time) error {
return this.rawConn.SetWriteDeadline(t)
}
func (this *TrafficConn) IsClosed() bool {
return this.isClosed
}

View File

@@ -24,7 +24,7 @@ func (this *TrafficListener) Accept() (net.Conn, error) {
// 是否在WAF名单中
ip, _, err := net.SplitHostPort(conn.RemoteAddr().String())
if err == nil {
if !waf.SharedIPWhiteList.Contains(waf.IPTypeAll, ip) && waf.SharedIPBlackLIst.Contains(waf.IPTypeAll, ip) {
if !waf.SharedIPWhiteList.Contains(waf.IPTypeAll, ip) && waf.SharedIPBlackList.Contains(waf.IPTypeAll, ip) {
defer func() {
_ = conn.Close()
}()

View File

@@ -57,10 +57,14 @@ func (this *WAFManager) convertWAF(policy *firewallconfigs.HTTPFirewallPolicy) (
if policy == nil {
return nil, errors.New("policy should not be nil")
}
if len(policy.Mode) == 0 {
policy.Mode = firewallconfigs.FirewallModeDefend
}
w := &waf.WAF{
Id: strconv.FormatInt(policy.Id, 10),
IsOn: policy.IsOn,
Name: policy.Name,
Mode: policy.Mode,
}
// inbound

View File

@@ -87,7 +87,7 @@ func (this *TrafficStatManager) Start(configFunc func() *nodeconfigs.NodeConfig)
// Add 添加流量
func (this *TrafficStatManager) Add(serverId int64, domain string, bytes int64, cachedBytes int64, countRequests int64, countCachedRequests int64, countAttacks int64, attackBytes int64) {
if bytes == 0 {
if bytes == 0 && countRequests == 0 {
return
}

View File

@@ -24,7 +24,13 @@ type Cache struct {
func NewCache(opt ...OptionInterface) *Cache {
countPieces := 128
maxItems := 1_000_000
maxItems := 2_000_000
var delta = systemMemoryGB() / 4
if delta > 0 {
maxItems *= delta
}
for _, option := range opt {
if option == nil {
continue
@@ -61,7 +67,7 @@ func NewCache(opt ...OptionInterface) *Cache {
return cache
}
func (this *Cache) Write(key string, value interface{}, expiredAt int64) {
func (this *Cache) Write(key string, value interface{}, expiredAt int64) (ok bool) {
if this.isDestroyed {
return
}
@@ -77,7 +83,7 @@ func (this *Cache) Write(key string, value interface{}, expiredAt int64) {
}
uint64Key := HashKey([]byte(key))
pieceIndex := uint64Key % this.countPieces
this.pieces[pieceIndex].Add(uint64Key, &Item{
return this.pieces[pieceIndex].Add(uint64Key, &Item{
Value: value,
expiredAt: expiredAt,
})

View File

@@ -25,6 +25,16 @@ func TestNewCache(t *testing.T) {
t.Log(cache.Read("a"))
time.Sleep(2 * time.Second)
t.Log(cache.Read("d"))
t.Log(cache.Count(), "items")
}
func TestCache_Memory(t *testing.T) {
cache := NewCache()
for i := 0; i < 20_000_000; i++ {
cache.Write("a"+strconv.Itoa(i), 1, time.Now().Unix()+3600)
}
t.Log("waiting ...")
time.Sleep(10 * time.Second)
}
func BenchmarkCache_Add(b *testing.B) {
@@ -65,6 +75,7 @@ func TestCache_Read(t *testing.T) {
for i := 0; i < 10_000_000; i++ {
cache.Write("HELLO_WORLD_"+strconv.Itoa(i), i, time.Now().Unix()+int64(i%10240)+1)
}
time.Sleep(10 * time.Second)
total := 0
for _, piece := range cache.pieces {

View File

@@ -17,7 +17,7 @@ func NewPiece(maxItems int) *Piece {
return &Piece{m: map[uint64]*Item{}, maxItems: maxItems}
}
func (this *Piece) Add(key uint64, item *Item) {
func (this *Piece) Add(key uint64, item *Item) (ok bool) {
this.locker.Lock()
if len(this.m) >= this.maxItems {
this.locker.Unlock()
@@ -25,6 +25,7 @@ func (this *Piece) Add(key uint64, item *Item) {
}
this.m[key] = item
this.locker.Unlock()
return true
}
func (this *Piece) IncreaseInt64(key uint64, delta int64, expiredAt int64) (result int64) {

View File

@@ -0,0 +1,23 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package ttlcache
import (
"github.com/shirou/gopsutil/mem"
)
var systemTotalMemory = -1
func systemMemoryGB() int {
if systemTotalMemory > 0 {
return systemTotalMemory
}
stat, err := mem.VirtualMemory()
if err != nil {
return 0
}
systemTotalMemory = int(stat.Total / 1024 / 1024 / 1024)
return systemTotalMemory
}

View File

@@ -0,0 +1,11 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package ttlcache
import "testing"
func TestSystemMemoryGB(t *testing.T) {
t.Log(systemMemoryGB())
t.Log(systemMemoryGB())
t.Log(systemMemoryGB())
}

View File

@@ -0,0 +1,48 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package utils
import "bytes"
// BufferPool pool for get byte slice
type BufferPool struct {
c chan *bytes.Buffer
}
// NewBufferPool 创建新对象
func NewBufferPool(maxSize int) *BufferPool {
if maxSize <= 0 {
maxSize = 1024
}
pool := &BufferPool{
c: make(chan *bytes.Buffer, maxSize),
}
return pool
}
// Get 获取一个新的Buffer
func (this *BufferPool) Get() (b *bytes.Buffer) {
select {
case b = <-this.c:
b.Reset()
default:
b = &bytes.Buffer{}
}
return
}
// Put 放回一个使用过的byte slice
func (this *BufferPool) Put(b *bytes.Buffer) {
b.Reset()
select {
case this.c <- b:
default:
// 已达最大容量,则抛弃
}
}
// Size 当前的数量
func (this *BufferPool) Size() int {
return len(this.c)
}

View File

@@ -0,0 +1,25 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package utils
import "io"
func CopyWithFilter(writer io.Writer, reader io.Reader, buf []byte, filter func(p []byte) []byte) (written int64, err error) {
for {
n, err := reader.Read(buf)
if n > 0 {
n2, err := writer.Write(filter(buf[:n]))
written += int64(n2)
if err != nil {
return written, err
}
}
if err != nil {
if err == io.EOF {
break
}
return written, err
}
}
return written, nil
}

View File

@@ -1,7 +1,6 @@
package utils
import (
"github.com/iwind/TeaGo/logs"
"sync"
"testing"
"time"
@@ -14,7 +13,7 @@ func TestTicker(t *testing.T) {
ticker.Stop()
}()
for ticker.Next() {
logs.Println("tick")
t.Log("tick")
}
t.Log("finished")
}
@@ -26,10 +25,10 @@ func TestTicker2(t *testing.T) {
ticker.Stop()
}()
for {
logs.Println("loop")
t.Log("loop")
select {
case <-ticker.C:
logs.Println("tick")
t.Log("tick")
case <-ticker.S:
return
}
@@ -42,7 +41,7 @@ func TestTickerEvery(t *testing.T) {
wg.Add(1)
Every(2*time.Second, func(ticker *Ticker) {
i++
logs.Println("TestTickerEvery i:", i)
t.Log("TestTickerEvery i:", i)
if i >= 4 {
ticker.Stop()
wg.Done()

View File

@@ -57,23 +57,16 @@ func (this *BlockAction) WillChange() bool {
}
func (this *BlockAction) Perform(waf *WAF, group *RuleGroup, set *RuleSet, request requests.Request, writer http.ResponseWriter) (allow bool) {
if this.Timeout > 0 {
// 加入到黑名单
SharedIPBlackLIst.Add(IPTypeAll, request.WAFRemoteIP(), time.Now().Unix()+int64(this.Timeout))
// 加入到黑名单
var timeout = this.Timeout
if timeout <= 0 {
timeout = 60 // 默认封锁60秒
}
SharedIPBlackList.Add(IPTypeAll, request.WAFRemoteIP(), time.Now().Unix()+int64(timeout))
if writer != nil {
// close the connection
defer func() {
hijack, ok := writer.(http.Hijacker)
if ok {
conn, _, _ := hijack.Hijack()
if conn != nil {
_ = conn.Close()
return
}
}
}()
defer request.WAFClose()
// output response
if this.StatusCode > 0 {
@@ -126,5 +119,6 @@ func (this *BlockAction) Perform(waf *WAF, group *RuleGroup, set *RuleSet, reque
_, _ = writer.Write([]byte("The request is blocked by " + teaconst.ProductName))
}
}
return false
}

View File

@@ -92,7 +92,7 @@ func (this *RecordIPAction) Perform(waf *WAF, group *RuleGroup, set *RuleSet, re
if this.Type == "black" {
_ = this.CloseConn(writer)
SharedIPBlackLIst.Add(IPTypeAll, request.WAFRemoteIP(), expiredAt)
SharedIPBlackList.Add(IPTypeAll, request.WAFRemoteIP(), expiredAt)
} else {
// 加入本地白名单
timeout := this.Timeout

View File

@@ -9,7 +9,7 @@ import (
)
var SharedIPWhiteList = NewIPList()
var SharedIPBlackLIst = NewIPList()
var SharedIPBlackList = NewIPList()
const IPTypeAll = "*"

View File

@@ -26,6 +26,9 @@ type Request interface {
// WAFServerId 服务ID
WAFServerId() int64
// WAFClose 关闭当前请求所在的连接
WAFClose()
// Format 格式化变量
Format(string) string
}

View File

@@ -66,6 +66,10 @@ func (this *TestRequest) WAFServerId() int64 {
return 0
}
// WAFClose 关闭当前请求所在的连接
func (this *TestRequest) WAFClose() {
}
func (this *TestRequest) Format(s string) string {
return s
}

View File

@@ -1,12 +1,14 @@
package waf
import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/waf/requests"
"github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/utils/string"
"net/http"
"sort"
)
type RuleConnector = string
@@ -74,6 +76,18 @@ func (this *RuleSet) Init(waf *WAF) error {
}
}
// sort actions
sort.Slice(this.actionInstances, func(i, j int) bool {
var instance1 = this.actionInstances[i]
if !instance1.WillChange() {
return true
}
if instance1.Code() == ActionRecordIP {
return true
}
return false
})
return nil
}
@@ -117,6 +131,10 @@ func (this *RuleSet) ActionCodes() []string {
}
func (this *RuleSet) PerformActions(waf *WAF, group *RuleGroup, req requests.Request, writer http.ResponseWriter) bool {
if len(waf.Mode) != 0 && waf.Mode != firewallconfigs.FirewallModeDefend {
return true
}
// 先执行allow
for _, instance := range this.actionInstances {
if !instance.WillChange() {

View File

@@ -2,6 +2,7 @@ package waf
import (
"errors"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/firewallconfigs"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/waf/checkpoints"
"github.com/TeaOSLab/EdgeNode/internal/waf/requests"
@@ -15,12 +16,13 @@ import (
)
type WAF struct {
Id string `yaml:"id" json:"id"`
IsOn bool `yaml:"isOn" json:"isOn"`
Name string `yaml:"name" json:"name"`
Inbound []*RuleGroup `yaml:"inbound" json:"inbound"`
Outbound []*RuleGroup `yaml:"outbound" json:"outbound"`
CreatedVersion string `yaml:"createdVersion" json:"createdVersion"`
Id string `yaml:"id" json:"id"`
IsOn bool `yaml:"isOn" json:"isOn"`
Name string `yaml:"name" json:"name"`
Inbound []*RuleGroup `yaml:"inbound" json:"inbound"`
Outbound []*RuleGroup `yaml:"outbound" json:"outbound"`
CreatedVersion string `yaml:"createdVersion" json:"createdVersion"`
Mode firewallconfigs.FirewallMode `yaml:"mode" json:"mode"`
DefaultBlockAction *BlockAction