Compare commits

...

13 Commits

Author SHA1 Message Date
刘祥超
b21bb8ee62 更新版本号为v0.1 2021-05-13 14:37:40 +08:00
刘祥超
8caa03175c 节点状态中增加缓存用量数据 2021-05-13 11:50:36 +08:00
刘祥超
d9d06b7be9 节点可以单独设置缓存的磁盘、内存容量 2021-05-12 21:38:44 +08:00
刘祥超
1192f15676 访问日志中增加缓存状态 2021-05-12 16:31:28 +08:00
刘祥超
ebf4d41290 支持自动添加X-Cache Header 2021-05-12 16:10:03 +08:00
刘祥超
a9ec78afb4 增加编译脚本 2021-05-12 15:07:43 +08:00
刘祥超
4526633027 服务支持fastcgi;路径规则支持匹配后缀 2021-05-10 21:13:18 +08:00
刘祥超
c7ddd0adda 实现基本的监控 2021-04-29 16:48:47 +08:00
刘祥超
ca07a6141b 标准化一些注释 2021-04-19 19:29:32 +08:00
刘祥超
6d0f90747e 修复在HTTP/2中反向代理出现的411错误 2021-04-19 19:28:18 +08:00
刘祥超
bce8fd5ea3 修复因为WAF而导致Content-Length没有显式设置的问题 2021-04-19 13:11:27 +08:00
刘祥超
d1fcbb46a3 缓存设置中增加“支持分片内容”选项,用来支持Chunked内容 2021-04-18 22:17:17 +08:00
刘祥超
d24f53477a 变更版本号 2021-04-18 21:25:20 +08:00
28 changed files with 682 additions and 134 deletions

View File

@@ -6,3 +6,4 @@
./build.sh linux mips64
./build.sh linux mips64le
./build.sh darwin amd64
./build.sh darwin arm64

1
go.mod
View File

@@ -13,6 +13,7 @@ require (
github.com/go-yaml/yaml v2.1.0+incompatible
github.com/golang/protobuf v1.4.2
github.com/iwind/TeaGo v0.0.0-20201020081413-7cf62d6f420f
github.com/iwind/gofcgi v0.0.0-20210506081859-17498ab3e9d7
github.com/lionsoul2014/ip2region v2.2.0-release+incompatible
github.com/mssola/user_agent v0.5.2
github.com/shirou/gopsutil v2.20.9+incompatible

2
go.sum
View File

@@ -58,6 +58,8 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
github.com/iwind/TeaGo v0.0.0-20200923021120-f5d76441fe9e/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc=
github.com/iwind/TeaGo v0.0.0-20201020081413-7cf62d6f420f h1:6Ws2H+eorfVUoMO2jta6A9nIdh8oi5/5LXo/LkAxR+E=
github.com/iwind/TeaGo v0.0.0-20201020081413-7cf62d6f420f/go.mod h1:KU4mS7QNiZ7QWEuDBk1zw0/Q2LrAPZv3tycEFBsuUwc=
github.com/iwind/gofcgi v0.0.0-20210506081859-17498ab3e9d7 h1:apv23QzWNmv0D76gB3+u/5kf0F/Yw4W8h489CWUZtss=
github.com/iwind/gofcgi v0.0.0-20210506081859-17498ab3e9d7/go.mod h1:JtbX20untAjUVjZs1ZBtq80f5rJWvwtQNRL6EnuYRnY=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=

View File

@@ -2,6 +2,7 @@ package caches
import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs/shared"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/iwind/TeaGo/lists"
"strconv"
@@ -10,12 +11,18 @@ import (
var SharedManager = NewManager()
// Manager 缓存策略管理器
type Manager struct {
// 全局配置
MaxDiskCapacity *shared.SizeCapacity
MaxMemoryCapacity *shared.SizeCapacity
policyMap map[int64]*serverconfigs.HTTPCachePolicy // policyId => []*Policy
storageMap map[int64]StorageInterface // policyId => *Storage
locker sync.RWMutex
}
// NewManager 获取管理器对象
func NewManager() *Manager {
return &Manager{
policyMap: map[int64]*serverconfigs.HTTPCachePolicy{},
@@ -23,7 +30,7 @@ func NewManager() *Manager {
}
}
// 重新设置策略
// UpdatePolicies 重新设置策略
func (this *Manager) UpdatePolicies(newPolicies []*serverconfigs.HTTPCachePolicy) {
this.locker.Lock()
defer this.locker.Unlock()
@@ -103,7 +110,7 @@ func (this *Manager) UpdatePolicies(newPolicies []*serverconfigs.HTTPCachePolicy
}
}
// 获取Policy信息
// FindPolicy 获取Policy信息
func (this *Manager) FindPolicy(policyId int64) *serverconfigs.HTTPCachePolicy {
this.locker.RLock()
defer this.locker.RUnlock()
@@ -112,7 +119,7 @@ func (this *Manager) FindPolicy(policyId int64) *serverconfigs.HTTPCachePolicy {
return p
}
// 根据策略ID查找存储
// FindStorageWithPolicy 根据策略ID查找存储
func (this *Manager) FindStorageWithPolicy(policyId int64) StorageInterface {
this.locker.RLock()
defer this.locker.RUnlock()
@@ -121,7 +128,7 @@ func (this *Manager) FindStorageWithPolicy(policyId int64) StorageInterface {
return storage
}
// 根据策略获取存储对象
// NewStorageWithPolicy 根据策略获取存储对象
func (this *Manager) NewStorageWithPolicy(policy *serverconfigs.HTTPCachePolicy) StorageInterface {
switch policy.Type {
case serverconfigs.CachePolicyStorageFile:
@@ -131,3 +138,27 @@ func (this *Manager) NewStorageWithPolicy(policy *serverconfigs.HTTPCachePolicy)
}
return nil
}
// TotalDiskSize 消耗的磁盘尺寸
func (this *Manager) TotalDiskSize() int64 {
this.locker.RLock()
defer this.locker.RUnlock()
total := int64(0)
for _, storage := range this.storageMap {
total += storage.TotalDiskSize()
}
return total
}
// TotalMemorySize 消耗的内存尺寸
func (this *Manager) TotalMemorySize() int64 {
this.locker.RLock()
defer this.locker.RUnlock()
total := int64(0)
for _, storage := range this.storageMap {
total += storage.TotalMemorySize()
}
return total
}

View File

@@ -40,7 +40,7 @@ var (
ErrInvalidRange = errors.New("invalid range")
)
// 文件缓存
// FileStorage 文件缓存
// 文件结构:
// [expires time] | [ status ] | [url length] | [header length] | [body length] | [url] [header data] [body data]
type FileStorage struct {
@@ -61,12 +61,12 @@ func NewFileStorage(policy *serverconfigs.HTTPCachePolicy) *FileStorage {
}
}
// 获取当前的Policy
// Policy 获取当前的Policy
func (this *FileStorage) Policy() *serverconfigs.HTTPCachePolicy {
return this.policy
}
// 初始化
// Init 初始化
func (this *FileStorage) Init() error {
this.list.OnAdd(func(item *Item) {
atomic.AddInt64(&this.totalSize, item.TotalSize())
@@ -148,14 +148,13 @@ func (this *FileStorage) Init() error {
// 加载内存缓存
if this.cacheConfig.MemoryPolicy != nil {
memoryCapacity := this.cacheConfig.MemoryPolicy.Capacity
if memoryCapacity != nil && memoryCapacity.Count > 0 {
if this.cacheConfig.MemoryPolicy.Capacity != nil && this.cacheConfig.MemoryPolicy.Capacity.Count > 0 {
memoryPolicy := &serverconfigs.HTTPCachePolicy{
Id: this.policy.Id,
IsOn: this.policy.IsOn,
Name: this.policy.Name,
Description: this.policy.Description,
Capacity: memoryCapacity,
Capacity: this.cacheConfig.MemoryPolicy.Capacity,
MaxKeys: this.policy.MaxKeys,
MaxSize: &shared.SizeCapacity{Count: 128, Unit: shared.SizeCapacityUnitMB}, // TODO 将来可以修改
Type: serverconfigs.CachePolicyStorageMemory,
@@ -214,7 +213,7 @@ func (this *FileStorage) OpenReader(key string) (Reader, error) {
return reader, nil
}
// 打开缓存文件等待写入
// OpenWriter 打开缓存文件等待写入
func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) {
// 先尝试内存缓存
if this.memoryStorage != nil {
@@ -228,7 +227,8 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr
if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys {
return nil, errors.New("write file cache failed: too many keys in cache storage")
}
if this.policy.CapacityBytes() > 0 && this.policy.CapacityBytes() <= this.totalSize {
capacityBytes := this.diskCapacityBytes()
if capacityBytes > 0 && capacityBytes <= this.totalSize {
return nil, errors.New("write file cache failed: over disk size, real size: " + strconv.FormatInt(this.totalSize, 10) + " bytes")
}
@@ -340,7 +340,7 @@ func (this *FileStorage) OpenWriter(key string, expiredAt int64, status int) (Wr
return NewFileWriter(writer, key, expiredAt), nil
}
// 添加到List
// AddToList 添加到List
func (this *FileStorage) AddToList(item *Item) {
if this.memoryStorage != nil {
if item.Type == ItemTypeMemory {
@@ -354,7 +354,7 @@ func (this *FileStorage) AddToList(item *Item) {
this.list.Add(hash, item)
}
// 删除某个键值对应的缓存
// Delete 删除某个键值对应的缓存
func (this *FileStorage) Delete(key string) error {
this.locker.Lock()
defer this.locker.Unlock()
@@ -373,7 +373,7 @@ func (this *FileStorage) Delete(key string) error {
return err
}
// 统计
// Stat 统计
func (this *FileStorage) Stat() (*Stat, error) {
this.locker.RLock()
defer this.locker.RUnlock()
@@ -383,7 +383,7 @@ func (this *FileStorage) Stat() (*Stat, error) {
}), nil
}
// 清除所有的缓存
// CleanAll 清除所有的缓存
func (this *FileStorage) CleanAll() error {
this.locker.Lock()
defer this.locker.Unlock()
@@ -441,7 +441,7 @@ func (this *FileStorage) CleanAll() error {
return nil
}
// 清理过期的缓存
// Purge 清理过期的缓存
func (this *FileStorage) Purge(keys []string, urlType string) error {
this.locker.Lock()
defer this.locker.Unlock()
@@ -480,7 +480,7 @@ func (this *FileStorage) Purge(keys []string, urlType string) error {
return nil
}
// 停止
// Stop 停止
func (this *FileStorage) Stop() {
this.locker.Lock()
defer this.locker.Unlock()
@@ -496,6 +496,19 @@ func (this *FileStorage) Stop() {
}
}
// TotalDiskSize 消耗的磁盘尺寸
func (this *FileStorage) TotalDiskSize() int64 {
return atomic.LoadInt64(&this.totalSize)
}
// TotalMemorySize 内存尺寸
func (this *FileStorage) TotalMemorySize() int64 {
if this.memoryStorage == nil {
return 0
}
return this.memoryStorage.TotalMemorySize()
}
// 绝对路径
func (this *FileStorage) dir() string {
return this.cacheConfig.Dir + "/p" + strconv.FormatInt(this.policy.Id, 10) + "/"
@@ -709,3 +722,14 @@ func (this *FileStorage) readN(fp *os.File, buf []byte, total int) (result []byt
}
}
}
func (this *FileStorage) diskCapacityBytes() int64 {
c1 := this.policy.CapacityBytes()
if SharedManager.MaxDiskCapacity != nil {
c2 := SharedManager.MaxDiskCapacity.Bytes()
if c2 > 0 {
return c2
}
}
return c1
}

View File

@@ -4,35 +4,41 @@ import (
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
)
// 缓存存储接口
// StorageInterface 缓存存储接口
type StorageInterface interface {
// 初始化
// Init 初始化
Init() error
// 读取缓存
// OpenReader 读取缓存
OpenReader(key string) (Reader, error)
// 打开缓存写入器等待写入
// OpenWriter 打开缓存写入器等待写入
OpenWriter(key string, expiredAt int64, status int) (Writer, error)
// 删除某个键值对应的缓存
// Delete 删除某个键值对应的缓存
Delete(key string) error
// 统计缓存
// Stat 统计缓存
Stat() (*Stat, error)
// 清除所有缓存
// TotalDiskSize 消耗的磁盘尺寸
TotalDiskSize() int64
// TotalMemorySize 内存尺寸
TotalMemorySize() int64
// CleanAll 清除所有缓存
CleanAll() error
// 批量删除缓存
// Purge 批量删除缓存
Purge(keys []string, urlType string) error
// 停止缓存策略
// Stop 停止缓存策略
Stop()
// 获取当前存储的Policy
// Policy 获取当前存储的Policy
Policy() *serverconfigs.HTTPCachePolicy
// 将缓存添加到列表
// AddToList 将缓存添加到列表
AddToList(item *Item)
}

View File

@@ -39,7 +39,7 @@ func NewMemoryStorage(policy *serverconfigs.HTTPCachePolicy) *MemoryStorage {
}
}
// 初始化
// Init 初始化
func (this *MemoryStorage) Init() error {
this.list.OnAdd(func(item *Item) {
atomic.AddInt64(&this.totalSize, item.Size())
@@ -63,7 +63,7 @@ func (this *MemoryStorage) Init() error {
return nil
}
// 读取缓存
// OpenReader 读取缓存
func (this *MemoryStorage) OpenReader(key string) (Reader, error) {
hash := this.hash(key)
@@ -89,13 +89,14 @@ func (this *MemoryStorage) OpenReader(key string) (Reader, error) {
return nil, ErrNotFound
}
// 打开缓存写入器等待写入
// OpenWriter 打开缓存写入器等待写入
func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (Writer, error) {
// 检查是否超出最大值
if this.policy.MaxKeys > 0 && this.list.Count() > this.policy.MaxKeys {
return nil, errors.New("write memory cache failed: too many keys in cache storage")
}
if this.policy.CapacityBytes() > 0 && this.policy.CapacityBytes() <= this.totalSize {
capacityBytes := this.memoryCapacityBytes()
if capacityBytes > 0 && capacityBytes <= this.totalSize {
return nil, errors.New("write memory cache failed: over memory size, real size: " + strconv.FormatInt(this.totalSize, 10) + " bytes")
}
@@ -108,7 +109,7 @@ func (this *MemoryStorage) OpenWriter(key string, expiredAt int64, status int) (
return NewMemoryWriter(this.valuesMap, key, expiredAt, status, this.locker), nil
}
// 删除某个键值对应的缓存
// Delete 删除某个键值对应的缓存
func (this *MemoryStorage) Delete(key string) error {
hash := this.hash(key)
this.locker.Lock()
@@ -118,7 +119,7 @@ func (this *MemoryStorage) Delete(key string) error {
return nil
}
// 统计缓存
// Stat 统计缓存
func (this *MemoryStorage) Stat() (*Stat, error) {
this.locker.RLock()
defer this.locker.RUnlock()
@@ -128,7 +129,7 @@ func (this *MemoryStorage) Stat() (*Stat, error) {
}), nil
}
// 清除所有缓存
// CleanAll 清除所有缓存
func (this *MemoryStorage) CleanAll() error {
this.locker.Lock()
this.valuesMap = map[uint64]*MemoryItem{}
@@ -138,7 +139,7 @@ func (this *MemoryStorage) CleanAll() error {
return nil
}
// 批量删除缓存
// Purge 批量删除缓存
func (this *MemoryStorage) Purge(keys []string, urlType string) error {
// 目录
if urlType == "dir" {
@@ -158,7 +159,7 @@ func (this *MemoryStorage) Purge(keys []string, urlType string) error {
return nil
}
// 停止缓存策略
// Stop 停止缓存策略
func (this *MemoryStorage) Stop() {
this.locker.Lock()
defer this.locker.Unlock()
@@ -170,18 +171,28 @@ func (this *MemoryStorage) Stop() {
}
}
// 获取当前存储的Policy
// Policy 获取当前存储的Policy
func (this *MemoryStorage) Policy() *serverconfigs.HTTPCachePolicy {
return this.policy
}
// 将缓存添加到列表
// AddToList 将缓存添加到列表
func (this *MemoryStorage) AddToList(item *Item) {
item.MetaSize = int64(len(item.Key)) + 32 /** 32是我们评估的数据结构的长度 **/
hash := fmt.Sprintf("%d", this.hash(item.Key))
this.list.Add(hash, item)
}
// TotalDiskSize 消耗的磁盘尺寸
func (this *MemoryStorage) TotalDiskSize() int64 {
return 0
}
// TotalMemorySize 内存尺寸
func (this *MemoryStorage) TotalMemorySize() int64 {
return atomic.LoadInt64(&this.totalSize)
}
// 计算Key Hash
func (this *MemoryStorage) hash(key string) uint64 {
return xxhash.Sum64String(key)
@@ -198,3 +209,20 @@ func (this *MemoryStorage) purgeLoop() {
}
})
}
func (this *MemoryStorage) memoryCapacityBytes() int64 {
if this.policy == nil {
return 0
}
c1 := int64(0)
if this.policy.Capacity != nil {
c1 = this.policy.Capacity.Bytes()
}
if SharedManager.MaxMemoryCapacity != nil {
c2 := SharedManager.MaxMemoryCapacity.Bytes()
if c2 > 0 {
return c2
}
}
return c1
}

View File

@@ -1,6 +1,8 @@
package caches
import "compress/gzip"
import (
"compress/gzip"
)
type gzipWriter struct {
rawWriter Writer

View File

@@ -1,7 +1,7 @@
package teaconst
const (
Version = "0.0.13"
Version = "0.1.0"
ProductName = "Edge Node"
ProcessName = "edge-node"
@@ -11,6 +11,6 @@ const (
EncryptKey = "8f983f4d69b83aaa0d74b21a212f6967"
EncryptMethod = "aes-256-cfb"
// systemd
// SystemdServiceName systemd
SystemdServiceName = "edge-node"
)

10
internal/monitor/value.go Normal file
View File

@@ -0,0 +1,10 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package monitor
// ItemValue 数据值定义
type ItemValue struct {
Item string
ValueJSON []byte
CreatedAt int64
}

View File

@@ -0,0 +1,79 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package monitor
import (
"encoding/json"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/iwind/TeaGo/maps"
"time"
)
var SharedValueQueue = NewValueQueue()
func init() {
events.On(events.EventStart, func() {
go SharedValueQueue.Start()
})
}
// ValueQueue 数据记录队列
type ValueQueue struct {
valuesChan chan *ItemValue
}
func NewValueQueue() *ValueQueue {
return &ValueQueue{
valuesChan: make(chan *ItemValue, 1024),
}
}
// Start 启动队列
func (this *ValueQueue) Start() {
// 这里单次循环就行因为Loop里已经使用了Range通道
err := this.Loop()
if err != nil {
remotelogs.Error("MONITOR_QUEUE", err.Error())
}
}
// Add 添加数据
func (this *ValueQueue) Add(item string, value maps.Map) {
valueJSON, err := json.Marshal(value)
if err != nil {
remotelogs.Error("MONITOR_QUEUE", "marshal value error: "+err.Error())
return
}
select {
case this.valuesChan <- &ItemValue{
Item: item,
ValueJSON: valueJSON,
CreatedAt: time.Now().Unix(),
}:
default:
}
}
// Loop 单次循环
func (this *ValueQueue) Loop() error {
rpcClient, err := rpc.SharedRPC()
if err != nil {
return err
}
for value := range this.valuesChan {
_, err = rpcClient.NodeValueRPC().CreateNodeValue(rpcClient.Context(), &pb.CreateNodeValueRequest{
Item: value.Item,
ValueJSON: value.ValueJSON,
CreatedAt: value.CreatedAt,
})
if err != nil {
return err
}
}
return nil
}

View File

@@ -5,13 +5,13 @@ import (
"net/http"
)
// HTTP客户端
// HTTPClient HTTP客户端
type HTTPClient struct {
rawClient *http.Client
accessAt int64
}
// 获取新客户端对象
// NewHTTPClient 获取新客户端对象
func NewHTTPClient(rawClient *http.Client) *HTTPClient {
return &HTTPClient{
rawClient: rawClient,
@@ -19,22 +19,22 @@ func NewHTTPClient(rawClient *http.Client) *HTTPClient {
}
}
// 获取原始客户端对象
// RawClient 获取原始客户端对象
func (this *HTTPClient) RawClient() *http.Client {
return this.rawClient
}
// 更新访问时间
// UpdateAccessTime 更新访问时间
func (this *HTTPClient) UpdateAccessTime() {
this.accessAt = utils.UnixTime()
}
// 获取访问时间
// AccessTime 获取访问时间
func (this *HTTPClient) AccessTime() int64 {
return this.accessAt
}
// 关闭
// Close 关闭
func (this *HTTPClient) Close() {
this.rawClient.CloseIdleConnections()
}

View File

@@ -14,17 +14,17 @@ import (
"time"
)
// HTTP客户端池单例
// SharedHTTPClientPool HTTP客户端池单例
var SharedHTTPClientPool = NewHTTPClientPool()
// 客户端池
// HTTPClientPool 客户端池
type HTTPClientPool struct {
clientExpiredDuration time.Duration
clientsMap map[string]*HTTPClient // backend key => client
locker sync.Mutex
}
// 获取新对象
// NewHTTPClientPool 获取新对象
func NewHTTPClientPool() *HTTPClientPool {
pool := &HTTPClientPool{
clientExpiredDuration: 3600 * time.Second,
@@ -36,7 +36,7 @@ func NewHTTPClientPool() *HTTPClientPool {
return pool
}
// 根据地址获取客户端
// Client 根据地址获取客户端
func (this *HTTPClientPool) Client(req *http.Request, origin *serverconfigs.OriginConfig, originAddr string) (rawClient *http.Client, err error) {
if origin.Addr == nil {
return nil, errors.New("origin addr should not be empty (originId:" + strconv.FormatInt(origin.Id, 10) + ")")

View File

@@ -28,7 +28,7 @@ var bytePool1k = utils.NewBytePool(20480, 1024)
var bytePool32k = utils.NewBytePool(20480, 32*1024)
var bytePool128k = utils.NewBytePool(20480, 128*1024)
// HTTP请求
// HTTPRequest HTTP请求
type HTTPRequest struct {
// 外部参数
RawReq *http.Request
@@ -95,7 +95,7 @@ func (this *HTTPRequest) init() {
this.requestFromTime = time.Now()
}
// 执行请求
// Do 执行请求
func (this *HTTPRequest) Do() {
// 初始化
this.init()
@@ -191,6 +191,13 @@ func (this *HTTPRequest) doBegin() {
}
}
// Fastcgi
if this.web.FastcgiRef != nil && this.web.FastcgiRef.IsOn && len(this.web.FastcgiList) > 0 {
if this.doFastcgi() {
return
}
}
// root
if this.web.Root != nil && this.web.Root.IsOn {
// 如果处理成功,则终止请求的处理
@@ -210,9 +217,6 @@ func (this *HTTPRequest) doBegin() {
return
}
// Fastcgi
// TODO
// 返回404页面
this.write404()
}
@@ -229,7 +233,7 @@ func (this *HTTPRequest) doEnd() {
}
}
// 原始的请求URI
// RawURI 原始的请求URI
func (this *HTTPRequest) RawURI() string {
return this.rawURI
}
@@ -332,6 +336,12 @@ func (this *HTTPRequest) configureWeb(web *serverconfigs.HTTPWebConfig, isTop bo
this.web.StatRef = web.StatRef
}
// fastcgi
if web.FastcgiRef != nil && (web.FastcgiRef.IsPrior || isTop) {
this.web.FastcgiRef = web.FastcgiRef
this.web.FastcgiList = web.FastcgiList
}
// 重写规则
if len(web.RewriteRefs) > 0 {
for index, ref := range web.RewriteRefs {
@@ -433,7 +443,7 @@ func (this *HTTPRequest) configureWeb(web *serverconfigs.HTTPWebConfig, isTop bo
return nil
}
// 利用请求参数格式化字符串
// Format 利用请求参数格式化字符串
func (this *HTTPRequest) Format(source string) string {
if len(source) == 0 {
return ""

View File

@@ -15,6 +15,15 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
if this.web.Cache == nil || !this.web.Cache.IsOn || len(this.web.Cache.CacheRefs) == 0 {
return
}
var addStatusHeader = this.web.Cache.AddStatusHeader
if addStatusHeader {
defer func() {
cacheStatus := this.varMapping["cache.status"]
if cacheStatus != "HIT" {
this.writer.Header().Set("X-Cache", cacheStatus)
}
}()
}
cachePolicy := sharedNodeConfig.HTTPCachePolicy
if cachePolicy == nil || !cachePolicy.IsOn {
@@ -88,6 +97,7 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
}()
this.varMapping["cache.status"] = "HIT"
this.logAttrs["cache.status"] = "HIT"
// 读取Header
headerBuf := []byte{}
@@ -115,6 +125,9 @@ func (this *HTTPRequest) doCacheRead() (shouldStop bool) {
return
}
if addStatusHeader {
this.writer.Header().Set("X-Cache", "HIT")
}
this.processResponseHeaders(reader.Status())
// 输出Body

View File

@@ -0,0 +1,213 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package nodes
import (
"errors"
"fmt"
"github.com/TeaOSLab/EdgeCommon/pkg/serverconfigs"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/iwind/TeaGo/Tea"
"github.com/iwind/TeaGo/maps"
"github.com/iwind/TeaGo/rands"
"github.com/iwind/TeaGo/types"
"github.com/iwind/gofcgi/pkg"
"io"
"net"
"net/url"
"path/filepath"
"strings"
)
func (this *HTTPRequest) doFastcgi() (shouldStop bool) {
fastcgiList := []*serverconfigs.HTTPFastcgiConfig{}
for _, fastcgi := range this.web.FastcgiList {
if !fastcgi.IsOn {
continue
}
fastcgiList = append(fastcgiList, fastcgi)
}
if len(fastcgiList) == 0 {
return false
}
shouldStop = true
fastcgi := fastcgiList[rands.Int(0, len(fastcgiList)-1)]
env := fastcgi.FilterParams()
if !env.Has("DOCUMENT_ROOT") {
env["DOCUMENT_ROOT"] = ""
}
if !env.Has("REMOTE_ADDR") {
env["REMOTE_ADDR"] = this.requestRemoteAddr()
}
if !env.Has("QUERY_STRING") {
u, err := url.ParseRequestURI(this.uri)
if err == nil {
env["QUERY_STRING"] = u.RawQuery
} else {
env["QUERY_STRING"] = this.RawReq.URL.RawQuery
}
}
if !env.Has("SERVER_NAME") {
env["SERVER_NAME"] = this.Host
}
if !env.Has("REQUEST_URI") {
env["REQUEST_URI"] = this.uri
}
if !env.Has("HOST") {
env["HOST"] = this.Host
}
if len(this.ServerAddr) > 0 {
if !env.Has("SERVER_ADDR") {
env["SERVER_ADDR"] = this.ServerAddr
}
if !env.Has("SERVER_PORT") {
_, port, err := net.SplitHostPort(this.ServerAddr)
if err == nil {
env["SERVER_PORT"] = port
}
}
}
// 连接池配置
poolSize := fastcgi.PoolSize
if poolSize <= 0 {
poolSize = 32
}
client, err := pkg.SharedPool(fastcgi.Network(), fastcgi.RealAddress(), uint(poolSize)).Client()
if err != nil {
this.write500(err)
return
}
// 请求相关
if !env.Has("REQUEST_METHOD") {
env["REQUEST_METHOD"] = this.RawReq.Method
}
if !env.Has("CONTENT_LENGTH") {
env["CONTENT_LENGTH"] = fmt.Sprintf("%d", this.RawReq.ContentLength)
}
if !env.Has("CONTENT_TYPE") {
env["CONTENT_TYPE"] = this.RawReq.Header.Get("Content-Type")
}
if !env.Has("SERVER_SOFTWARE") {
env["SERVER_SOFTWARE"] = teaconst.ProductName + "/v" + teaconst.Version
}
// 处理SCRIPT_FILENAME
scriptPath := env.GetString("SCRIPT_FILENAME")
if len(scriptPath) > 0 && (strings.Index(scriptPath, "/") < 0 && strings.Index(scriptPath, "\\") < 0) {
env["SCRIPT_FILENAME"] = env.GetString("DOCUMENT_ROOT") + Tea.DS + scriptPath
}
scriptFilename := filepath.Base(this.RawReq.URL.Path)
// PATH_INFO
pathInfoReg := fastcgi.PathInfoRegexp()
pathInfo := ""
if pathInfoReg != nil {
matches := pathInfoReg.FindStringSubmatch(this.RawReq.URL.Path)
countMatches := len(matches)
if countMatches == 1 {
pathInfo = matches[0]
} else if countMatches == 2 {
pathInfo = matches[1]
} else if countMatches > 2 {
scriptFilename = matches[1]
pathInfo = matches[2]
}
if !env.Has("PATH_INFO") {
env["PATH_INFO"] = pathInfo
}
}
this.addVarMapping(map[string]string{
"fastcgi.documentRoot": env.GetString("DOCUMENT_ROOT"),
"fastcgi.filename": scriptFilename,
"fastcgi.pathInfo": pathInfo,
})
params := map[string]string{}
for key, value := range env {
params[key] = this.Format(types.String(value))
}
this.processRequestHeaders(this.RawReq.Header)
for k, v := range this.RawReq.Header {
if k == "Connection" {
continue
}
for _, subV := range v {
params["HTTP_"+strings.ToUpper(strings.Replace(k, "-", "_", -1))] = subV
}
}
host, found := params["HTTP_HOST"]
if !found || len(host) == 0 {
params["HTTP_HOST"] = this.Host
}
fcgiReq := pkg.NewRequest()
fcgiReq.SetTimeout(fastcgi.ReadTimeoutDuration())
fcgiReq.SetParams(params)
fcgiReq.SetBody(this.RawReq.Body, uint32(this.requestLength()))
resp, stderr, err := client.Call(fcgiReq)
if err != nil {
this.write500(err)
return
}
if len(stderr) > 0 {
err := errors.New("Fastcgi Error: " + strings.TrimSpace(string(stderr)) + " script: " + maps.NewMap(params).GetString("SCRIPT_FILENAME"))
this.write500(err)
return
}
defer func() {
_ = resp.Body.Close()
}()
// 设置Charset
// TODO 这里应该可以设置文本类型的列表,以及是否强制覆盖所有文本类型的字符集
if this.web.Charset != nil && this.web.Charset.IsOn && len(this.web.Charset.Charset) > 0 {
contentTypes, ok := resp.Header["Content-Type"]
if ok && len(contentTypes) > 0 {
contentType := contentTypes[0]
if _, found := textMimeMap[contentType]; found {
resp.Header["Content-Type"][0] = contentType + "; charset=" + this.web.Charset.Charset
}
}
}
// 响应Header
this.writer.AddHeaders(resp.Header)
this.processResponseHeaders(resp.StatusCode)
// 准备
this.writer.Prepare(resp.ContentLength)
// 设置响应代码
this.writer.WriteHeader(resp.StatusCode)
// 输出到客户端
pool := this.bytePool(resp.ContentLength)
buf := pool.Get()
_, err = io.CopyBuffer(this.writer, resp.Body, buf)
pool.Put(buf)
err1 := resp.Body.Close()
if err1 != nil {
remotelogs.Error("REQUEST_REVERSE_PROXY", err1.Error())
}
if err != nil && err != io.EOF {
remotelogs.Error("REQUEST_REVERSE_PROXY", err.Error())
this.addError(err)
}
return
}

View File

@@ -147,6 +147,12 @@ func (this *HTTPRequest) doReverseProxy() {
return
}
// 在HTTP/2下需要防止因为requestBody而导致Content-Length为空的问题
if this.RawReq.ProtoMajor == 2 && this.RawReq.ContentLength == 0 {
_ = this.RawReq.Body.Close()
this.RawReq.Body = nil
}
// 开始请求
resp, err := client.Do(this.RawReq)
if err != nil {

View File

@@ -304,7 +304,7 @@ func (this *HTTPWriter) prepareGzip(size int64) {
// 准备缓存
func (this *HTTPWriter) prepareCache(size int64) {
if this.writer == nil || size <= 0 {
if this.writer == nil {
return
}
@@ -319,10 +319,16 @@ func (this *HTTPWriter) prepareCache(size int64) {
}
cacheRef := this.req.cacheRef
if cacheRef == nil ||
!cacheRef.IsOn ||
(cacheRef.MaxSizeBytes() > 0 && size > cacheRef.MaxSizeBytes()) ||
(cachePolicy.MaxSizeBytes() > 0 && size > cachePolicy.MaxSizeBytes()) {
if cacheRef == nil || !cacheRef.IsOn {
return
}
// 如果允许 ChunkedEncoding就无需尺寸的判断因为此时的 size 为 -1
if !cacheRef.AllowChunkedEncoding && size < 0 {
return
}
if size >= 0 && ((cacheRef.MaxSizeBytes() > 0 && size > cacheRef.MaxSizeBytes()) ||
(cachePolicy.MaxSizeBytes() > 0 && size > cachePolicy.MaxSizeBytes())) {
return
}

View File

@@ -48,6 +48,7 @@ func (this *Listener) Listen() error {
if err != nil {
return err
}
netListener = NewTrafficListener(netListener)
events.On(events.EventQuit, func() {
remotelogs.Println("LISTENER", "quit "+this.group.FullAddr())
_ = netListener.Close()

View File

@@ -362,6 +362,8 @@ func (this *Node) syncConfig() error {
}
nodeconfigs.ResetNodeConfig(nodeConfig)
caches.SharedManager.MaxDiskCapacity = nodeConfig.MaxCacheDiskCapacity
caches.SharedManager.MaxMemoryCapacity = nodeConfig.MaxCacheMemoryCapacity
if nodeConfig.HTTPCachePolicy != nil {
caches.SharedManager.UpdatePolicies([]*serverconfigs.HTTPCachePolicy{nodeConfig.HTTPCachePolicy})
} else {

View File

@@ -4,12 +4,15 @@ import (
"encoding/json"
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeNode/internal/caches"
teaconst "github.com/TeaOSLab/EdgeNode/internal/const"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/monitor"
"github.com/TeaOSLab/EdgeNode/internal/remotelogs"
"github.com/TeaOSLab/EdgeNode/internal/rpc"
"github.com/TeaOSLab/EdgeNode/internal/utils"
"github.com/iwind/TeaGo/lists"
"github.com/iwind/TeaGo/maps"
"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/disk"
"os"
@@ -62,6 +65,13 @@ func (this *NodeStatusExecutor) update() {
status.ConfigVersion = sharedNodeConfig.Version
status.IsActive = true
status.ConnectionCount = sharedListenerManager.TotalActiveConnections()
status.CacheTotalDiskSize = caches.SharedManager.TotalDiskSize()
status.CacheTotalMemorySize = caches.SharedManager.TotalMemorySize()
// 记录监控数据
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemConnections, maps.Map{
"total": status.ConnectionCount,
})
hostname, _ := os.Hostname()
status.Hostname = hostname
@@ -108,6 +118,11 @@ func (this *NodeStatusExecutor) updateCPU(status *nodeconfigs.NodeStatus) {
}
status.CPUUsage = percents[0] / 100
// 记录监控数据
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemCPU, maps.Map{
"usage": status.CPUUsage,
})
if this.cpuLogicalCount == 0 && this.cpuPhysicalCount == 0 {
this.cpuUpdatedTime = time.Now()
@@ -188,4 +203,11 @@ func (this *NodeStatusExecutor) updateDisk(status *nodeconfigs.NodeStatus) {
status.DiskTotal = total
status.DiskUsage = float64(totalUsage) / float64(total)
status.DiskMaxUsage = maxUsage / 100
// 记录监控数据
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemDisk, maps.Map{
"total": status.DiskTotal,
"usage": status.DiskUsage,
"maxUsage": status.DiskMaxUsage,
})
}

View File

@@ -4,6 +4,8 @@ package nodes
import (
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeNode/internal/monitor"
"github.com/iwind/TeaGo/maps"
"github.com/shirou/gopsutil/load"
"github.com/shirou/gopsutil/mem"
)
@@ -22,6 +24,13 @@ func (this *NodeStatusExecutor) updateMem(status *nodeconfigs.NodeStatus) {
}
status.MemoryTotal = stat.Total
// 记录监控数据
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemMemory, maps.Map{
"usage": status.MemoryUsage,
"total": status.MemoryTotal,
"used": stat.Used,
})
}
// 更新负载
@@ -38,4 +47,11 @@ func (this *NodeStatusExecutor) updateLoad(status *nodeconfigs.NodeStatus) {
status.Load1m = stat.Load1
status.Load5m = stat.Load5
status.Load15m = stat.Load15
// 记录监控数据
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemLoad, maps.Map{
"load1m": status.Load1m,
"load5m": status.Load5m,
"load15m": status.Load15m,
})
}

View File

@@ -0,0 +1,92 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package nodes
import (
"github.com/TeaOSLab/EdgeCommon/pkg/nodeconfigs"
"github.com/TeaOSLab/EdgeNode/internal/events"
"github.com/TeaOSLab/EdgeNode/internal/monitor"
"github.com/iwind/TeaGo/maps"
"net"
"sync/atomic"
"time"
)
// 流量统计
var inTrafficBytes = uint64(0)
var outTrafficBytes = uint64(0)
// 发送监控流量
func init() {
events.On(events.EventStart, func() {
ticker := time.NewTicker(1 * time.Minute)
go func() {
for range ticker.C {
// 加入到数据队列中
if inTrafficBytes > 0 {
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemTrafficIn, maps.Map{
"total": inTrafficBytes,
})
}
if outTrafficBytes > 0 {
monitor.SharedValueQueue.Add(nodeconfigs.NodeValueItemTrafficOut, maps.Map{
"total": outTrafficBytes,
})
}
// 重置数据
atomic.StoreUint64(&inTrafficBytes, 0)
atomic.StoreUint64(&outTrafficBytes, 0)
}
}()
})
}
// TrafficConn 用于统计流量的连接
type TrafficConn struct {
rawConn net.Conn
}
func NewTrafficConn(conn net.Conn) net.Conn {
return &TrafficConn{rawConn: conn}
}
func (this *TrafficConn) Read(b []byte) (n int, err error) {
n, err = this.rawConn.Read(b)
if n > 0 {
atomic.AddUint64(&inTrafficBytes, uint64(n))
}
return
}
func (this *TrafficConn) Write(b []byte) (n int, err error) {
n, err = this.rawConn.Write(b)
if n > 0 {
atomic.AddUint64(&outTrafficBytes, uint64(n))
}
return
}
func (this *TrafficConn) Close() error {
return this.rawConn.Close()
}
func (this *TrafficConn) LocalAddr() net.Addr {
return this.rawConn.LocalAddr()
}
func (this *TrafficConn) RemoteAddr() net.Addr {
return this.rawConn.RemoteAddr()
}
func (this *TrafficConn) SetDeadline(t time.Time) error {
return this.rawConn.SetDeadline(t)
}
func (this *TrafficConn) SetReadDeadline(t time.Time) error {
return this.rawConn.SetReadDeadline(t)
}
func (this *TrafficConn) SetWriteDeadline(t time.Time) error {
return this.rawConn.SetWriteDeadline(t)
}

View File

@@ -0,0 +1,30 @@
// Copyright 2021 Liuxiangchao iwind.liu@gmail.com. All rights reserved.
package nodes
import "net"
// TrafficListener 用于统计流量的网络监听
type TrafficListener struct {
rawListener net.Listener
}
func NewTrafficListener(listener net.Listener) net.Listener {
return &TrafficListener{rawListener: listener}
}
func (this *TrafficListener) Accept() (net.Conn, error) {
conn, err := this.rawListener.Accept()
if err != nil {
return nil, err
}
return NewTrafficConn(conn), nil
}
func (this *TrafficListener) Close() error {
return this.rawListener.Close()
}
func (this *TrafficListener) Addr() net.Addr {
return this.rawListener.Addr()
}

View File

@@ -57,6 +57,10 @@ func (this *RPCClient) NodeTaskRPC() pb.NodeTaskServiceClient {
return pb.NewNodeTaskServiceClient(this.pickConn())
}
func (this *RPCClient) NodeValueRPC() pb.NodeValueServiceClient {
return pb.NewNodeValueServiceClient(this.pickConn())
}
func (this *RPCClient) HTTPAccessLogRPC() pb.HTTPAccessLogServiceClient {
return pb.NewHTTPAccessLogServiceClient(this.pickConn())
}
@@ -105,7 +109,7 @@ func (this *RPCClient) ServerDailyStatRPC() pb.ServerDailyStatServiceClient {
return pb.NewServerDailyStatServiceClient(this.pickConn())
}
// 节点上下文信息
// Context 节点上下文信息
func (this *RPCClient) Context() context.Context {
ctx := context.Background()
m := maps.Map{
@@ -128,7 +132,7 @@ func (this *RPCClient) Context() context.Context {
return ctx
}
// 集群上下文
// ClusterContext 集群上下文
func (this *RPCClient) ClusterContext(clusterId string, clusterSecret string) context.Context {
ctx := context.Background()
m := maps.Map{
@@ -151,14 +155,14 @@ func (this *RPCClient) ClusterContext(clusterId string, clusterSecret string) co
return ctx
}
// 关闭连接
// Close 关闭连接
func (this *RPCClient) Close() {
for _, conn := range this.conns {
_ = conn.Close()
}
}
// 修改配置
// UpdateConfig 修改配置
func (this *RPCClient) UpdateConfig(config *configs.APIConfig) error {
this.apiConfig = config
return this.init()

View File

@@ -1,55 +0,0 @@
package rpc
import (
"github.com/TeaOSLab/EdgeCommon/pkg/rpc/pb"
"github.com/TeaOSLab/EdgeNode/internal/configs"
_ "github.com/iwind/TeaGo/bootstrap"
"testing"
"time"
)
func TestRPCClient_NodeRPC(t *testing.T) {
before := time.Now()
defer func() {
t.Log(time.Since(before).Seconds()*1000, "ms")
}()
config, err := configs.LoadAPIConfig()
if err != nil {
t.Fatal(err)
}
rpc, err := NewRPCClient(config)
if err != nil {
t.Fatal(err)
}
resp, err := rpc.NodeRPC().ComposeNodeConfig(rpc.Context(), &pb.ComposeNodeConfigRequest{})
if err != nil {
t.Fatal(err)
}
t.Log(resp)
}
func TestSharedRPC_Stream(t *testing.T) {
config, err := configs.LoadAPIConfig()
if err != nil {
t.Fatal(err)
}
rpc, err := NewRPCClient(config)
if err != nil {
t.Fatal(err)
}
client, err := rpc.NodeRPC().NodeStream(rpc.Context())
if err != nil {
t.Fatal(err)
}
err = client.Send(&pb.NodeStreamRequest{})
if err != nil {
t.Fatal(err)
}
for {
resp, err := client.Recv()
if err != nil {
t.Fatal(err)
}
t.Log("recv:", resp)
}
}

View File

@@ -15,14 +15,14 @@ import (
var SharedTrafficStatManager = NewTrafficStatManager()
// 区域流量统计
// TrafficStatManager 区域流量统计
type TrafficStatManager struct {
m map[string]int64 // [timestamp serverId] => bytes
locker sync.Mutex
configFunc func() *nodeconfigs.NodeConfig
}
// 获取新对象
// NewTrafficStatManager 获取新对象
func NewTrafficStatManager() *TrafficStatManager {
manager := &TrafficStatManager{
m: map[string]int64{},
@@ -31,7 +31,7 @@ func NewTrafficStatManager() *TrafficStatManager {
return manager
}
// 启动自动任务
// Start 启动自动任务
func (this *TrafficStatManager) Start(configFunc func() *nodeconfigs.NodeConfig) {
this.configFunc = configFunc
@@ -54,7 +54,7 @@ func (this *TrafficStatManager) Start(configFunc func() *nodeconfigs.NodeConfig)
}
}
// 添加流量
// Add 添加流量
func (this *TrafficStatManager) Add(serverId int64, bytes int64) {
if bytes == 0 {
return
@@ -68,7 +68,7 @@ func (this *TrafficStatManager) Add(serverId int64, bytes int64) {
this.locker.Unlock()
}
// 上传流量
// Upload 上传流量
func (this *TrafficStatManager) Upload() error {
config := this.configFunc()
if config == nil {

View File

@@ -23,13 +23,17 @@ func (this *Request) Raw() *http.Request {
}
func (this *Request) ReadBody(max int64) (data []byte, err error) {
data, err = ioutil.ReadAll(io.LimitReader(this.Request.Body, max))
if this.Request.ContentLength > 0 {
data, err = ioutil.ReadAll(io.LimitReader(this.Request.Body, max))
}
return
}
func (this *Request) RestoreBody(data []byte) {
rawReader := bytes.NewBuffer(data)
buf := make([]byte, 1024)
io.CopyBuffer(rawReader, this.Request.Body, buf)
this.Request.Body = ioutil.NopCloser(rawReader)
if len(data) > 0 {
rawReader := bytes.NewBuffer(data)
buf := make([]byte, 1024)
_, _ = io.CopyBuffer(rawReader, this.Request.Body, buf)
this.Request.Body = ioutil.NopCloser(rawReader)
}
}