refactor: 优化代码组织结构
This commit is contained in:
		
							parent
							
								
									3d5e292d26
								
							
						
					
					
						commit
						c65028188c
					
				@ -1,196 +1,40 @@
 | 
				
			|||||||
package main
 | 
					package main
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"bytes"
 | 
					 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"fmt"
 | 
					 | 
				
			||||||
	"log"
 | 
					 | 
				
			||||||
	"time"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	"github.com/ory/graceful"
 | 
					 | 
				
			||||||
	"github.com/patrickmn/go-cache"
 | 
					 | 
				
			||||||
	"github.com/robfig/cron/v3"
 | 
					 | 
				
			||||||
	"gorm.io/driver/sqlite"
 | 
					 | 
				
			||||||
	"gorm.io/gorm"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	"github.com/naiba/nezha/cmd/dashboard/controller"
 | 
						"github.com/naiba/nezha/cmd/dashboard/controller"
 | 
				
			||||||
	"github.com/naiba/nezha/cmd/dashboard/rpc"
 | 
						"github.com/naiba/nezha/cmd/dashboard/rpc"
 | 
				
			||||||
	"github.com/naiba/nezha/model"
 | 
						"github.com/naiba/nezha/model"
 | 
				
			||||||
	"github.com/naiba/nezha/service/singleton"
 | 
						"github.com/naiba/nezha/service/singleton"
 | 
				
			||||||
 | 
						"github.com/ory/graceful"
 | 
				
			||||||
 | 
						"log"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func init() {
 | 
					func init() {
 | 
				
			||||||
	shanghai, err := time.LoadLocation("Asia/Shanghai")
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		panic(err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// 初始化 dao 包
 | 
						// 初始化 dao 包
 | 
				
			||||||
	singleton.Init()
 | 
						singleton.Init()
 | 
				
			||||||
	singleton.Conf = &model.Config{}
 | 
						singleton.InitConfigFromPath("data/config.yaml")
 | 
				
			||||||
	singleton.Cron = cron.New(cron.WithSeconds(), cron.WithLocation(shanghai))
 | 
						singleton.InitDBFromPath("data/sqlite.db")
 | 
				
			||||||
	singleton.Crons = make(map[uint64]*model.Cron)
 | 
					 | 
				
			||||||
	singleton.ServerList = make(map[uint64]*model.Server)
 | 
					 | 
				
			||||||
	singleton.SecretToID = make(map[string]uint64)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	err = singleton.Conf.Read("data/config.yaml")
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		panic(err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	singleton.DB, err = gorm.Open(sqlite.Open("data/sqlite.db"), &gorm.Config{
 | 
					 | 
				
			||||||
		CreateBatchSize: 200,
 | 
					 | 
				
			||||||
	})
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		panic(err)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if singleton.Conf.Debug {
 | 
					 | 
				
			||||||
		singleton.DB = singleton.DB.Debug()
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if singleton.Conf.GRPCPort == 0 {
 | 
					 | 
				
			||||||
		singleton.Conf.GRPCPort = 5555
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	singleton.Cache = cache.New(5*time.Minute, 10*time.Minute)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	initSystem()
 | 
						initSystem()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func initSystem() {
 | 
					func initSystem() {
 | 
				
			||||||
	singleton.DB.AutoMigrate(model.Server{}, model.User{},
 | 
						// 启动 singleton 包下的所有服务
 | 
				
			||||||
		model.Notification{}, model.AlertRule{}, model.Monitor{},
 | 
						singleton.LoadSingleton()
 | 
				
			||||||
		model.MonitorHistory{}, model.Cron{}, model.Transfer{})
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	singleton.LoadNotifications()
 | 
					 | 
				
			||||||
	loadServers() //加载服务器列表
 | 
					 | 
				
			||||||
	loadCrons()   //加载计划任务
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 每天的3:30 对 监控记录 和 流量记录 进行清理
 | 
						// 每天的3:30 对 监控记录 和 流量记录 进行清理
 | 
				
			||||||
	_, err := singleton.Cron.AddFunc("0 30 3 * * *", cleanMonitorHistory)
 | 
						if _, err := singleton.Cron.AddFunc("0 30 3 * * *", singleton.CleanMonitorHistory); err != nil {
 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		panic(err)
 | 
							panic(err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 每小时对流量记录进行打点
 | 
						// 每小时对流量记录进行打点
 | 
				
			||||||
	_, err = singleton.Cron.AddFunc("0 0 * * * *", recordTransferHourlyUsage)
 | 
						if _, err := singleton.Cron.AddFunc("0 0 * * * *", singleton.RecordTransferHourlyUsage); err != nil {
 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		panic(err)
 | 
							panic(err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// recordTransferHourlyUsage 对流量记录进行打点
 | 
					 | 
				
			||||||
func recordTransferHourlyUsage() {
 | 
					 | 
				
			||||||
	singleton.ServerLock.Lock()
 | 
					 | 
				
			||||||
	defer singleton.ServerLock.Unlock()
 | 
					 | 
				
			||||||
	now := time.Now()
 | 
					 | 
				
			||||||
	nowTrimSeconds := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
 | 
					 | 
				
			||||||
	var txs []model.Transfer
 | 
					 | 
				
			||||||
	for id, server := range singleton.ServerList {
 | 
					 | 
				
			||||||
		tx := model.Transfer{
 | 
					 | 
				
			||||||
			ServerID: id,
 | 
					 | 
				
			||||||
			In:       server.State.NetInTransfer - uint64(server.PrevHourlyTransferIn),
 | 
					 | 
				
			||||||
			Out:      server.State.NetOutTransfer - uint64(server.PrevHourlyTransferOut),
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		if tx.In == 0 && tx.Out == 0 {
 | 
					 | 
				
			||||||
			continue
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		server.PrevHourlyTransferIn = int64(server.State.NetInTransfer)
 | 
					 | 
				
			||||||
		server.PrevHourlyTransferOut = int64(server.State.NetOutTransfer)
 | 
					 | 
				
			||||||
		tx.CreatedAt = nowTrimSeconds
 | 
					 | 
				
			||||||
		txs = append(txs, tx)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if len(txs) == 0 {
 | 
					 | 
				
			||||||
		return
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	log.Println("NEZHA>> Cron 流量统计入库", len(txs), singleton.DB.Create(txs).Error)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// cleanMonitorHistory 清理无效或过时的 监控记录 和 流量记录
 | 
					 | 
				
			||||||
func cleanMonitorHistory() {
 | 
					 | 
				
			||||||
	// 清理已被删除的服务器的监控记录与流量记录
 | 
					 | 
				
			||||||
	singleton.DB.Unscoped().Delete(&model.MonitorHistory{}, "created_at < ? OR monitor_id NOT IN (SELECT `id` FROM monitors)", time.Now().AddDate(0, 0, -30))
 | 
					 | 
				
			||||||
	singleton.DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (SELECT `id` FROM servers)")
 | 
					 | 
				
			||||||
	// 计算可清理流量记录的时长
 | 
					 | 
				
			||||||
	var allServerKeep time.Time
 | 
					 | 
				
			||||||
	specialServerKeep := make(map[uint64]time.Time)
 | 
					 | 
				
			||||||
	var specialServerIDs []uint64
 | 
					 | 
				
			||||||
	var alerts []model.AlertRule
 | 
					 | 
				
			||||||
	singleton.DB.Find(&alerts)
 | 
					 | 
				
			||||||
	for i := 0; i < len(alerts); i++ {
 | 
					 | 
				
			||||||
		for j := 0; j < len(alerts[i].Rules); j++ {
 | 
					 | 
				
			||||||
			// 是不是流量记录规则
 | 
					 | 
				
			||||||
			if !alerts[i].Rules[j].IsTransferDurationRule() {
 | 
					 | 
				
			||||||
				continue
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			dataCouldRemoveBefore := alerts[i].Rules[j].GetTransferDurationStart()
 | 
					 | 
				
			||||||
			// 判断规则影响的机器范围
 | 
					 | 
				
			||||||
			if alerts[i].Rules[j].Cover == model.RuleCoverAll {
 | 
					 | 
				
			||||||
				// 更新全局可以清理的数据点
 | 
					 | 
				
			||||||
				if allServerKeep.IsZero() || allServerKeep.After(dataCouldRemoveBefore) {
 | 
					 | 
				
			||||||
					allServerKeep = dataCouldRemoveBefore
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
			} else {
 | 
					 | 
				
			||||||
				// 更新特定机器可以清理数据点
 | 
					 | 
				
			||||||
				for id := range alerts[i].Rules[j].Ignore {
 | 
					 | 
				
			||||||
					if specialServerKeep[id].IsZero() || specialServerKeep[id].After(dataCouldRemoveBefore) {
 | 
					 | 
				
			||||||
						specialServerKeep[id] = dataCouldRemoveBefore
 | 
					 | 
				
			||||||
						specialServerIDs = append(specialServerIDs, id)
 | 
					 | 
				
			||||||
					}
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	for id, couldRemove := range specialServerKeep {
 | 
					 | 
				
			||||||
		singleton.DB.Unscoped().Delete(&model.Transfer{}, "server_id = ? AND created_at < ?", id, couldRemove)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if allServerKeep.IsZero() {
 | 
					 | 
				
			||||||
		singleton.DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (?)", specialServerIDs)
 | 
					 | 
				
			||||||
	} else {
 | 
					 | 
				
			||||||
		singleton.DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (?) AND created_at < ?", specialServerIDs, allServerKeep)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
//loadServers 加载服务器列表并根据ID排序
 | 
					 | 
				
			||||||
func loadServers() {
 | 
					 | 
				
			||||||
	var servers []model.Server
 | 
					 | 
				
			||||||
	singleton.DB.Find(&servers)
 | 
					 | 
				
			||||||
	for _, s := range servers {
 | 
					 | 
				
			||||||
		innerS := s
 | 
					 | 
				
			||||||
		innerS.Host = &model.Host{}
 | 
					 | 
				
			||||||
		innerS.State = &model.HostState{}
 | 
					 | 
				
			||||||
		singleton.ServerList[innerS.ID] = &innerS
 | 
					 | 
				
			||||||
		singleton.SecretToID[innerS.Secret] = innerS.ID
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	singleton.ReSortServer()
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// loadCrons 加载计划任务
 | 
					 | 
				
			||||||
func loadCrons() {
 | 
					 | 
				
			||||||
	var crons []model.Cron
 | 
					 | 
				
			||||||
	singleton.DB.Find(&crons)
 | 
					 | 
				
			||||||
	var err error
 | 
					 | 
				
			||||||
	errMsg := new(bytes.Buffer)
 | 
					 | 
				
			||||||
	for i := 0; i < len(crons); i++ {
 | 
					 | 
				
			||||||
		cr := crons[i]
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		// 注册计划任务
 | 
					 | 
				
			||||||
		cr.CronJobID, err = singleton.Cron.AddFunc(cr.Scheduler, singleton.CronTrigger(cr))
 | 
					 | 
				
			||||||
		if err == nil {
 | 
					 | 
				
			||||||
			singleton.Crons[cr.ID] = &cr
 | 
					 | 
				
			||||||
		} else {
 | 
					 | 
				
			||||||
			if errMsg.Len() == 0 {
 | 
					 | 
				
			||||||
				errMsg.WriteString("调度失败的计划任务:[")
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			errMsg.WriteString(fmt.Sprintf("%d,", cr.ID))
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if errMsg.Len() > 0 {
 | 
					 | 
				
			||||||
		msg := errMsg.String()
 | 
					 | 
				
			||||||
		singleton.SendNotification(msg[:len(msg)-1]+"] 这些任务将无法正常执行,请进入后点重新修改保存。", false)
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	singleton.Cron.Start()
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func main() {
 | 
					func main() {
 | 
				
			||||||
	cleanMonitorHistory()
 | 
						singleton.CleanMonitorHistory()
 | 
				
			||||||
	go rpc.ServeRPC(singleton.Conf.GRPCPort)
 | 
						go rpc.ServeRPC(singleton.Conf.GRPCPort)
 | 
				
			||||||
	serviceSentinelDispatchBus := make(chan model.Monitor) // 用于传递服务监控任务信息的channel
 | 
						serviceSentinelDispatchBus := make(chan model.Monitor) // 用于传递服务监控任务信息的channel
 | 
				
			||||||
	go rpc.DispatchTask(serviceSentinelDispatchBus)
 | 
						go rpc.DispatchTask(serviceSentinelDispatchBus)
 | 
				
			||||||
@ -202,7 +46,7 @@ func main() {
 | 
				
			|||||||
		return srv.ListenAndServe()
 | 
							return srv.ListenAndServe()
 | 
				
			||||||
	}, func(c context.Context) error {
 | 
						}, func(c context.Context) error {
 | 
				
			||||||
		log.Println("NEZHA>> Graceful::START")
 | 
							log.Println("NEZHA>> Graceful::START")
 | 
				
			||||||
		recordTransferHourlyUsage()
 | 
							singleton.RecordTransferHourlyUsage()
 | 
				
			||||||
		log.Println("NEZHA>> Graceful::END")
 | 
							log.Println("NEZHA>> Graceful::END")
 | 
				
			||||||
		srv.Shutdown(c)
 | 
							srv.Shutdown(c)
 | 
				
			||||||
		return nil
 | 
							return nil
 | 
				
			||||||
 | 
				
			|||||||
@ -98,6 +98,9 @@ func (c *Config) Read(path string) error {
 | 
				
			|||||||
	if c.Site.Theme == "" {
 | 
						if c.Site.Theme == "" {
 | 
				
			||||||
		c.Site.Theme = "default"
 | 
							c.Site.Theme = "default"
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						if c.GRPCPort == 0 {
 | 
				
			||||||
 | 
							c.GRPCPort = 5555
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	c.updateIgnoredIPNotificationID()
 | 
						c.updateIgnoredIPNotificationID()
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										81
									
								
								service/singleton/crontask.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										81
									
								
								service/singleton/crontask.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,81 @@
 | 
				
			|||||||
 | 
					package singleton
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"bytes"
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
						"github.com/naiba/nezha/model"
 | 
				
			||||||
 | 
						pb "github.com/naiba/nezha/proto"
 | 
				
			||||||
 | 
						"github.com/robfig/cron/v3"
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var (
 | 
				
			||||||
 | 
						Cron     *cron.Cron
 | 
				
			||||||
 | 
						Crons    map[uint64]*model.Cron
 | 
				
			||||||
 | 
						CronLock sync.RWMutex
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func InitCronTask() {
 | 
				
			||||||
 | 
						Cron = cron.New(cron.WithSeconds(), cron.WithLocation(Loc))
 | 
				
			||||||
 | 
						Crons = make(map[uint64]*model.Cron)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// LoadCronTasks 加载计划任务
 | 
				
			||||||
 | 
					func LoadCronTasks() {
 | 
				
			||||||
 | 
						InitCronTask()
 | 
				
			||||||
 | 
						var crons []model.Cron
 | 
				
			||||||
 | 
						DB.Find(&crons)
 | 
				
			||||||
 | 
						var err error
 | 
				
			||||||
 | 
						errMsg := new(bytes.Buffer)
 | 
				
			||||||
 | 
						for i := 0; i < len(crons); i++ {
 | 
				
			||||||
 | 
							cr := crons[i]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// 注册计划任务
 | 
				
			||||||
 | 
							cr.CronJobID, err = Cron.AddFunc(cr.Scheduler, CronTrigger(cr))
 | 
				
			||||||
 | 
							if err == nil {
 | 
				
			||||||
 | 
								Crons[cr.ID] = &cr
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								if errMsg.Len() == 0 {
 | 
				
			||||||
 | 
									errMsg.WriteString("调度失败的计划任务:[")
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								errMsg.WriteString(fmt.Sprintf("%d,", cr.ID))
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if errMsg.Len() > 0 {
 | 
				
			||||||
 | 
							msg := errMsg.String()
 | 
				
			||||||
 | 
							SendNotification(msg[:len(msg)-1]+"] 这些任务将无法正常执行,请进入后点重新修改保存。", false)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						Cron.Start()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func ManualTrigger(c model.Cron) {
 | 
				
			||||||
 | 
						CronTrigger(c)()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func CronTrigger(cr model.Cron) func() {
 | 
				
			||||||
 | 
						crIgnoreMap := make(map[uint64]bool)
 | 
				
			||||||
 | 
						for j := 0; j < len(cr.Servers); j++ {
 | 
				
			||||||
 | 
							crIgnoreMap[cr.Servers[j]] = true
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return func() {
 | 
				
			||||||
 | 
							ServerLock.RLock()
 | 
				
			||||||
 | 
							defer ServerLock.RUnlock()
 | 
				
			||||||
 | 
							for _, s := range ServerList {
 | 
				
			||||||
 | 
								if cr.Cover == model.CronCoverAll && crIgnoreMap[s.ID] {
 | 
				
			||||||
 | 
									continue
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if cr.Cover == model.CronCoverIgnoreAll && !crIgnoreMap[s.ID] {
 | 
				
			||||||
 | 
									continue
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if s.TaskStream != nil {
 | 
				
			||||||
 | 
									s.TaskStream.Send(&pb.Task{
 | 
				
			||||||
 | 
										Id:   cr.ID,
 | 
				
			||||||
 | 
										Data: cr.Command,
 | 
				
			||||||
 | 
										Type: model.TaskTypeCommand,
 | 
				
			||||||
 | 
									})
 | 
				
			||||||
 | 
								} else {
 | 
				
			||||||
 | 
									SendNotification(fmt.Sprintf("[任务失败] %s,服务器 %s 离线,无法执行。", cr.Name, s.Name), false)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@ -16,7 +16,7 @@ const firstNotificationDelay = time.Minute * 15
 | 
				
			|||||||
var notifications []model.Notification
 | 
					var notifications []model.Notification
 | 
				
			||||||
var notificationsLock sync.RWMutex
 | 
					var notificationsLock sync.RWMutex
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// LoadNotifications 加载通知方式到 singleton.notifications 变量
 | 
					// LoadNotifications 从 DB 加载通知方式到 singleton.notifications 变量
 | 
				
			||||||
func LoadNotifications() {
 | 
					func LoadNotifications() {
 | 
				
			||||||
	notificationsLock.Lock()
 | 
						notificationsLock.Lock()
 | 
				
			||||||
	if err := DB.Find(¬ifications).Error; err != nil {
 | 
						if err := DB.Find(¬ifications).Error; err != nil {
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										58
									
								
								service/singleton/server.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										58
									
								
								service/singleton/server.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,58 @@
 | 
				
			|||||||
 | 
					package singleton
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"github.com/naiba/nezha/model"
 | 
				
			||||||
 | 
						"sort"
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var (
 | 
				
			||||||
 | 
						ServerList map[uint64]*model.Server // [ServerID] -> model.Server
 | 
				
			||||||
 | 
						SecretToID map[string]uint64        // [ServerSecret] -> ServerID
 | 
				
			||||||
 | 
						ServerLock sync.RWMutex
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						SortedServerList []*model.Server // 用于存储服务器列表的 slice,按照服务器 ID 排序
 | 
				
			||||||
 | 
						SortedServerLock sync.RWMutex
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// InitServer 初始化 ServerID <-> Secret 的映射
 | 
				
			||||||
 | 
					func InitServer() {
 | 
				
			||||||
 | 
						ServerList = make(map[uint64]*model.Server)
 | 
				
			||||||
 | 
						SecretToID = make(map[string]uint64)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					//LoadServers 加载服务器列表并根据ID排序
 | 
				
			||||||
 | 
					func LoadServers() {
 | 
				
			||||||
 | 
						InitServer()
 | 
				
			||||||
 | 
						var servers []model.Server
 | 
				
			||||||
 | 
						DB.Find(&servers)
 | 
				
			||||||
 | 
						for _, s := range servers {
 | 
				
			||||||
 | 
							innerS := s
 | 
				
			||||||
 | 
							innerS.Host = &model.Host{}
 | 
				
			||||||
 | 
							innerS.State = &model.HostState{}
 | 
				
			||||||
 | 
							ServerList[innerS.ID] = &innerS
 | 
				
			||||||
 | 
							SecretToID[innerS.Secret] = innerS.ID
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						ReSortServer()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// ReSortServer 根据服务器ID 对服务器列表进行排序(ID越大越靠前)
 | 
				
			||||||
 | 
					func ReSortServer() {
 | 
				
			||||||
 | 
						ServerLock.RLock()
 | 
				
			||||||
 | 
						defer ServerLock.RUnlock()
 | 
				
			||||||
 | 
						SortedServerLock.Lock()
 | 
				
			||||||
 | 
						defer SortedServerLock.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						SortedServerList = []*model.Server{}
 | 
				
			||||||
 | 
						for _, s := range ServerList {
 | 
				
			||||||
 | 
							SortedServerList = append(SortedServerList, s)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// 按照服务器 ID 排序的具体实现(ID越大越靠前)
 | 
				
			||||||
 | 
						sort.SliceStable(SortedServerList, func(i, j int) bool {
 | 
				
			||||||
 | 
							if SortedServerList[i].DisplayIndex == SortedServerList[j].DisplayIndex {
 | 
				
			||||||
 | 
								return SortedServerList[i].ID < SortedServerList[j].ID
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return SortedServerList[i].DisplayIndex > SortedServerList[j].DisplayIndex
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@ -1,18 +1,13 @@
 | 
				
			|||||||
package singleton
 | 
					package singleton
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"fmt"
 | 
						"gorm.io/driver/sqlite"
 | 
				
			||||||
	"sort"
 | 
					 | 
				
			||||||
	"sync"
 | 
					 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/patrickmn/go-cache"
 | 
						"github.com/patrickmn/go-cache"
 | 
				
			||||||
	"github.com/robfig/cron/v3"
 | 
					 | 
				
			||||||
	"gorm.io/gorm"
 | 
						"gorm.io/gorm"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/naiba/nezha/model"
 | 
						"github.com/naiba/nezha/model"
 | 
				
			||||||
	"github.com/naiba/nezha/pkg/utils"
 | 
					 | 
				
			||||||
	pb "github.com/naiba/nezha/proto"
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var Version = "v0.12.18" // !!记得修改 README 中的 badge 版本!!
 | 
					var Version = "v0.12.18" // !!记得修改 README 中的 badge 版本!!
 | 
				
			||||||
@ -22,86 +17,52 @@ var (
 | 
				
			|||||||
	Cache *cache.Cache
 | 
						Cache *cache.Cache
 | 
				
			||||||
	DB    *gorm.DB
 | 
						DB    *gorm.DB
 | 
				
			||||||
	Loc   *time.Location
 | 
						Loc   *time.Location
 | 
				
			||||||
 | 
					 | 
				
			||||||
	ServerList map[uint64]*model.Server // [ServerID] -> model.Server
 | 
					 | 
				
			||||||
	SecretToID map[string]uint64        // [ServerSecret] -> ServerID
 | 
					 | 
				
			||||||
	ServerLock sync.RWMutex
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	SortedServerList []*model.Server // 用于存储服务器列表的 slice,按照服务器 ID 排序
 | 
					 | 
				
			||||||
	SortedServerLock sync.RWMutex
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Init 初始化时区为上海时区
 | 
					// Init 初始化singleton
 | 
				
			||||||
func Init() {
 | 
					func Init() {
 | 
				
			||||||
 | 
						// 初始化时区至上海 UTF+8
 | 
				
			||||||
	var err error
 | 
						var err error
 | 
				
			||||||
	Loc, err = time.LoadLocation("Asia/Shanghai")
 | 
						Loc, err = time.LoadLocation("Asia/Shanghai")
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		panic(err)
 | 
							panic(err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						Conf = &model.Config{}
 | 
				
			||||||
 | 
						Cache = cache.New(5*time.Minute, 10*time.Minute)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// ReSortServer 根据服务器ID 对服务器列表进行排序(ID越大越靠前)
 | 
					// LoadSingleton 加载子服务并执行
 | 
				
			||||||
func ReSortServer() {
 | 
					func LoadSingleton() {
 | 
				
			||||||
	ServerLock.RLock()
 | 
						LoadNotifications() // 加载通知服务
 | 
				
			||||||
	defer ServerLock.RUnlock()
 | 
						LoadServers()       // 加载服务器列表
 | 
				
			||||||
	SortedServerLock.Lock()
 | 
						LoadCronTasks()     // 加载定时任务
 | 
				
			||||||
	defer SortedServerLock.Unlock()
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	SortedServerList = []*model.Server{}
 | 
					// InitConfigFromPath 从给出的文件路径中加载配置
 | 
				
			||||||
	for _, s := range ServerList {
 | 
					func InitConfigFromPath(path string) {
 | 
				
			||||||
		SortedServerList = append(SortedServerList, s)
 | 
						err := Conf.Read(path)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							panic(err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// 按照服务器 ID 排序的具体实现(ID越大越靠前)
 | 
					// InitDBFromPath 从给出的文件路径中加载数据库
 | 
				
			||||||
	sort.SliceStable(SortedServerList, func(i, j int) bool {
 | 
					func InitDBFromPath(path string) {
 | 
				
			||||||
		if SortedServerList[i].DisplayIndex == SortedServerList[j].DisplayIndex {
 | 
						var err error
 | 
				
			||||||
			return SortedServerList[i].ID < SortedServerList[j].ID
 | 
						DB, err = gorm.Open(sqlite.Open(path), &gorm.Config{
 | 
				
			||||||
		}
 | 
							CreateBatchSize: 200,
 | 
				
			||||||
		return SortedServerList[i].DisplayIndex > SortedServerList[j].DisplayIndex
 | 
					 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
}
 | 
						if err != nil {
 | 
				
			||||||
 | 
							panic(err)
 | 
				
			||||||
// =============== Cron Mixin ===============
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
var CronLock sync.RWMutex
 | 
					 | 
				
			||||||
var Crons map[uint64]*model.Cron
 | 
					 | 
				
			||||||
var Cron *cron.Cron
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func ManualTrigger(c model.Cron) {
 | 
					 | 
				
			||||||
	CronTrigger(c)()
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func CronTrigger(cr model.Cron) func() {
 | 
					 | 
				
			||||||
	crIgnoreMap := make(map[uint64]bool)
 | 
					 | 
				
			||||||
	for j := 0; j < len(cr.Servers); j++ {
 | 
					 | 
				
			||||||
		crIgnoreMap[cr.Servers[j]] = true
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return func() {
 | 
					 | 
				
			||||||
		ServerLock.RLock()
 | 
					 | 
				
			||||||
		defer ServerLock.RUnlock()
 | 
					 | 
				
			||||||
		for _, s := range ServerList {
 | 
					 | 
				
			||||||
			if cr.Cover == model.CronCoverAll && crIgnoreMap[s.ID] {
 | 
					 | 
				
			||||||
				continue
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			if cr.Cover == model.CronCoverIgnoreAll && !crIgnoreMap[s.ID] {
 | 
					 | 
				
			||||||
				continue
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			if s.TaskStream != nil {
 | 
					 | 
				
			||||||
				s.TaskStream.Send(&pb.Task{
 | 
					 | 
				
			||||||
					Id:   cr.ID,
 | 
					 | 
				
			||||||
					Data: cr.Command,
 | 
					 | 
				
			||||||
					Type: model.TaskTypeCommand,
 | 
					 | 
				
			||||||
				})
 | 
					 | 
				
			||||||
			} else {
 | 
					 | 
				
			||||||
				SendNotification(fmt.Sprintf("[任务失败] %s,服务器 %s 离线,无法执行。", cr.Name, s.Name), false)
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						if Conf.Debug {
 | 
				
			||||||
 | 
							DB = DB.Debug()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						err = DB.AutoMigrate(model.Server{}, model.User{},
 | 
				
			||||||
 | 
							model.Notification{}, model.AlertRule{}, model.Monitor{},
 | 
				
			||||||
 | 
							model.MonitorHistory{}, model.Cron{}, model.Transfer{})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							panic(err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					 | 
				
			||||||
func IPDesensitize(ip string) string {
 | 
					 | 
				
			||||||
	if Conf.EnablePlainIPInNotification {
 | 
					 | 
				
			||||||
		return ip
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return utils.IPDesensitize(ip)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										95
									
								
								service/singleton/toolfunc.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										95
									
								
								service/singleton/toolfunc.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,95 @@
 | 
				
			|||||||
 | 
					package singleton
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"github.com/naiba/nezha/model"
 | 
				
			||||||
 | 
						"github.com/naiba/nezha/pkg/utils"
 | 
				
			||||||
 | 
						"log"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/*
 | 
				
			||||||
 | 
						该文件保存了一些工具函数
 | 
				
			||||||
 | 
						RecordTransferHourlyUsage	对流量记录进行打点
 | 
				
			||||||
 | 
						CleanMonitorHistory			清理无效或过时的 监控记录 和 流量记录
 | 
				
			||||||
 | 
						IPDesensitize				根据用户设置选择是否对IP进行打码处理 返回处理后的IP(关闭打码则返回原IP)
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// RecordTransferHourlyUsage 对流量记录进行打点
 | 
				
			||||||
 | 
					func RecordTransferHourlyUsage() {
 | 
				
			||||||
 | 
						ServerLock.Lock()
 | 
				
			||||||
 | 
						defer ServerLock.Unlock()
 | 
				
			||||||
 | 
						now := time.Now()
 | 
				
			||||||
 | 
						nowTrimSeconds := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
 | 
				
			||||||
 | 
						var txs []model.Transfer
 | 
				
			||||||
 | 
						for id, server := range ServerList {
 | 
				
			||||||
 | 
							tx := model.Transfer{
 | 
				
			||||||
 | 
								ServerID: id,
 | 
				
			||||||
 | 
								In:       server.State.NetInTransfer - uint64(server.PrevHourlyTransferIn),
 | 
				
			||||||
 | 
								Out:      server.State.NetOutTransfer - uint64(server.PrevHourlyTransferOut),
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if tx.In == 0 && tx.Out == 0 {
 | 
				
			||||||
 | 
								continue
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							server.PrevHourlyTransferIn = int64(server.State.NetInTransfer)
 | 
				
			||||||
 | 
							server.PrevHourlyTransferOut = int64(server.State.NetOutTransfer)
 | 
				
			||||||
 | 
							tx.CreatedAt = nowTrimSeconds
 | 
				
			||||||
 | 
							txs = append(txs, tx)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(txs) == 0 {
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						log.Println("NEZHA>> Cron 流量统计入库", len(txs), DB.Create(txs).Error)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// CleanMonitorHistory 清理无效或过时的 监控记录 和 流量记录
 | 
				
			||||||
 | 
					func CleanMonitorHistory() {
 | 
				
			||||||
 | 
						// 清理已被删除的服务器的监控记录与流量记录
 | 
				
			||||||
 | 
						DB.Unscoped().Delete(&model.MonitorHistory{}, "created_at < ? OR monitor_id NOT IN (SELECT `id` FROM monitors)", time.Now().AddDate(0, 0, -30))
 | 
				
			||||||
 | 
						DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (SELECT `id` FROM servers)")
 | 
				
			||||||
 | 
						// 计算可清理流量记录的时长
 | 
				
			||||||
 | 
						var allServerKeep time.Time
 | 
				
			||||||
 | 
						specialServerKeep := make(map[uint64]time.Time)
 | 
				
			||||||
 | 
						var specialServerIDs []uint64
 | 
				
			||||||
 | 
						var alerts []model.AlertRule
 | 
				
			||||||
 | 
						DB.Find(&alerts)
 | 
				
			||||||
 | 
						for i := 0; i < len(alerts); i++ {
 | 
				
			||||||
 | 
							for j := 0; j < len(alerts[i].Rules); j++ {
 | 
				
			||||||
 | 
								// 是不是流量记录规则
 | 
				
			||||||
 | 
								if !alerts[i].Rules[j].IsTransferDurationRule() {
 | 
				
			||||||
 | 
									continue
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								dataCouldRemoveBefore := alerts[i].Rules[j].GetTransferDurationStart()
 | 
				
			||||||
 | 
								// 判断规则影响的机器范围
 | 
				
			||||||
 | 
								if alerts[i].Rules[j].Cover == model.RuleCoverAll {
 | 
				
			||||||
 | 
									// 更新全局可以清理的数据点
 | 
				
			||||||
 | 
									if allServerKeep.IsZero() || allServerKeep.After(dataCouldRemoveBefore) {
 | 
				
			||||||
 | 
										allServerKeep = dataCouldRemoveBefore
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								} else {
 | 
				
			||||||
 | 
									// 更新特定机器可以清理数据点
 | 
				
			||||||
 | 
									for id := range alerts[i].Rules[j].Ignore {
 | 
				
			||||||
 | 
										if specialServerKeep[id].IsZero() || specialServerKeep[id].After(dataCouldRemoveBefore) {
 | 
				
			||||||
 | 
											specialServerKeep[id] = dataCouldRemoveBefore
 | 
				
			||||||
 | 
											specialServerIDs = append(specialServerIDs, id)
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						for id, couldRemove := range specialServerKeep {
 | 
				
			||||||
 | 
							DB.Unscoped().Delete(&model.Transfer{}, "server_id = ? AND created_at < ?", id, couldRemove)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if allServerKeep.IsZero() {
 | 
				
			||||||
 | 
							DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (?)", specialServerIDs)
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (?) AND created_at < ?", specialServerIDs, allServerKeep)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// IPDesensitize 根据设置选择是否对IP进行打码处理 返回处理后的IP(关闭打码则返回原IP)
 | 
				
			||||||
 | 
					func IPDesensitize(ip string) string {
 | 
				
			||||||
 | 
						if Conf.EnablePlainIPInNotification {
 | 
				
			||||||
 | 
							return ip
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return utils.IPDesensitize(ip)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user