Merge pull request #158 from AkkiaS7/master
optimize:移除两处冗余的代码 + refactor:优化代码组织结构 Co-authored-by: AkkiaS7 <68485070+AkkiaS7@users.noreply.github.com>
This commit is contained in:
		
						commit
						c8933e76e4
					
				@ -38,16 +38,16 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type AgentCliParam struct {
 | 
			
		||||
	SkipConnectionCount   bool
 | 
			
		||||
	SkipProcsCount        bool
 | 
			
		||||
	DisableAutoUpdate     bool
 | 
			
		||||
	DisableForceUpdate    bool
 | 
			
		||||
	DisableCommandExecute bool
 | 
			
		||||
	Debug                 bool
 | 
			
		||||
	Server                string
 | 
			
		||||
	ClientSecret          string
 | 
			
		||||
	ReportDelay           int
 | 
			
		||||
	TLS                   bool
 | 
			
		||||
	SkipConnectionCount   bool   // 跳过连接数检查
 | 
			
		||||
	SkipProcsCount        bool   // 跳过进程数量检查
 | 
			
		||||
	DisableAutoUpdate     bool   // 关闭自动更新
 | 
			
		||||
	DisableForceUpdate    bool   // 关闭强制更新
 | 
			
		||||
	DisableCommandExecute bool   // 关闭命令执行
 | 
			
		||||
	Debug                 bool   // debug模式
 | 
			
		||||
	Server                string // 服务器地址
 | 
			
		||||
	ClientSecret          string // 客户端密钥
 | 
			
		||||
	ReportDelay           int    // 报告间隔
 | 
			
		||||
	TLS                   bool   // 是否使用TLS加密传输至服务端
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
@ -60,7 +60,6 @@ var (
 | 
			
		||||
var (
 | 
			
		||||
	agentCliParam AgentCliParam
 | 
			
		||||
	agentConfig   model.AgentConfig
 | 
			
		||||
	updateCh      = make(chan struct{}) // Agent 自动更新间隔
 | 
			
		||||
	httpClient    = &http.Client{
 | 
			
		||||
		CheckRedirect: func(req *http.Request, via []*http.Request) error {
 | 
			
		||||
			return http.ErrUseLastResponse
 | 
			
		||||
@ -86,6 +85,7 @@ func init() {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func main() {
 | 
			
		||||
	// windows环境处理
 | 
			
		||||
	if runtime.GOOS == "windows" {
 | 
			
		||||
		hostArch, err := host.KernelArch()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
@ -108,6 +108,7 @@ func main() {
 | 
			
		||||
	// 来自于 GoReleaser 的版本号
 | 
			
		||||
	monitor.Version = version
 | 
			
		||||
 | 
			
		||||
	// 初始化运行参数
 | 
			
		||||
	var isEditAgentConfig bool
 | 
			
		||||
	flag.BoolVarP(&agentCliParam.Debug, "debug", "d", false, "开启调试信息")
 | 
			
		||||
	flag.BoolVarP(&isEditAgentConfig, "edit-agent-config", "", false, "修改要监控的网卡/分区白名单")
 | 
			
		||||
@ -145,6 +146,7 @@ func run() {
 | 
			
		||||
		ClientSecret: agentCliParam.ClientSecret,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 下载远程命令执行需要的终端
 | 
			
		||||
	if !agentCliParam.DisableCommandExecute {
 | 
			
		||||
		go pty.DownloadDependency()
 | 
			
		||||
	}
 | 
			
		||||
@ -153,19 +155,14 @@ func run() {
 | 
			
		||||
	// 更新IP信息
 | 
			
		||||
	go monitor.UpdateIP()
 | 
			
		||||
 | 
			
		||||
	// 定时检查更新
 | 
			
		||||
	if _, err := semver.Parse(version); err == nil && !agentCliParam.DisableAutoUpdate {
 | 
			
		||||
		doSelfUpdate(true)
 | 
			
		||||
		go func() {
 | 
			
		||||
			for range updateCh {
 | 
			
		||||
				go func() {
 | 
			
		||||
					defer func() {
 | 
			
		||||
						time.Sleep(time.Minute * 20)
 | 
			
		||||
						updateCh <- struct{}{}
 | 
			
		||||
					}()
 | 
			
		||||
					doSelfUpdate(true)
 | 
			
		||||
				}()
 | 
			
		||||
			for range time.Tick(20 * time.Minute) {
 | 
			
		||||
				doSelfUpdate(true)
 | 
			
		||||
			}
 | 
			
		||||
		}()
 | 
			
		||||
		updateCh <- struct{}{}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var err error
 | 
			
		||||
@ -267,6 +264,7 @@ func doTask(task *pb.Task) {
 | 
			
		||||
	client.ReportTask(context.Background(), &result)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// reportState 向server上报状态信息
 | 
			
		||||
func reportState() {
 | 
			
		||||
	var lastReportHostInfo time.Time
 | 
			
		||||
	var err error
 | 
			
		||||
@ -282,6 +280,7 @@ func reportState() {
 | 
			
		||||
				println("reportState error", err)
 | 
			
		||||
				time.Sleep(delayWhenError)
 | 
			
		||||
			}
 | 
			
		||||
			// 每10分钟重新获取一次硬件信息
 | 
			
		||||
			if lastReportHostInfo.Before(time.Now().Add(-10 * time.Minute)) {
 | 
			
		||||
				lastReportHostInfo = time.Now()
 | 
			
		||||
				client.ReportSystemInfo(context.Background(), monitor.GetHost(&agentConfig).PB())
 | 
			
		||||
@ -291,6 +290,7 @@ func reportState() {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// doSelfUpdate 执行更新检查 如果更新成功则会结束进程
 | 
			
		||||
func doSelfUpdate(useLocalVersion bool) {
 | 
			
		||||
	v := semver.MustParse("0.1.0")
 | 
			
		||||
	if useLocalVersion {
 | 
			
		||||
@ -303,6 +303,7 @@ func doSelfUpdate(useLocalVersion bool) {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	if !latest.Version.Equals(v) {
 | 
			
		||||
		println("已经更新至:", latest.Version, " 正在结束进程")
 | 
			
		||||
		os.Exit(1)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@ -500,6 +501,7 @@ func handleTerminalTask(task *pb.Task) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// 修改Agent要监控的网卡与硬盘分区
 | 
			
		||||
func editAgentConfig() {
 | 
			
		||||
	nc, err := psnet.IOCounters(true)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
 | 
			
		||||
@ -37,6 +37,7 @@ var (
 | 
			
		||||
	cachedBootTime                                                             time.Time
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// GetHost 获取主机硬件信息
 | 
			
		||||
func GetHost(agentConfig *model.AgentConfig) *model.Host {
 | 
			
		||||
	hi, _ := host.Info()
 | 
			
		||||
	var cpuType string
 | 
			
		||||
@ -155,6 +156,7 @@ func GetState(agentConfig *model.AgentConfig, skipConnectionCount bool, skipProc
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TrackNetworkSpeed NIC监控,统计流量与速度
 | 
			
		||||
func TrackNetworkSpeed(agentConfig *model.AgentConfig) {
 | 
			
		||||
	var innerNetInTransfer, innerNetOutTransfer uint64
 | 
			
		||||
	nc, err := net.IOCounters(true)
 | 
			
		||||
 | 
			
		||||
@ -51,6 +51,7 @@ var (
 | 
			
		||||
	httpClientV6            = utils.NewSingleStackHTTPClient(time.Second*20, time.Second*5, time.Second*10, true)
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// UpdateIP 每30分钟更新一次IP地址与国家码的缓存
 | 
			
		||||
func UpdateIP() {
 | 
			
		||||
	for {
 | 
			
		||||
		ipv4 := fetchGeoIP(geoIPApiList, false)
 | 
			
		||||
 | 
			
		||||
@ -1,201 +1,40 @@
 | 
			
		||||
package main
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"context"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"log"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/ory/graceful"
 | 
			
		||||
	"github.com/patrickmn/go-cache"
 | 
			
		||||
	"github.com/robfig/cron/v3"
 | 
			
		||||
	"gorm.io/driver/sqlite"
 | 
			
		||||
	"gorm.io/gorm"
 | 
			
		||||
 | 
			
		||||
	"github.com/naiba/nezha/cmd/dashboard/controller"
 | 
			
		||||
	"github.com/naiba/nezha/cmd/dashboard/rpc"
 | 
			
		||||
	"github.com/naiba/nezha/model"
 | 
			
		||||
	"github.com/naiba/nezha/service/singleton"
 | 
			
		||||
	"github.com/ory/graceful"
 | 
			
		||||
	"log"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	shanghai, err := time.LoadLocation("Asia/Shanghai")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 初始化 dao 包
 | 
			
		||||
	singleton.Init()
 | 
			
		||||
	singleton.Conf = &model.Config{}
 | 
			
		||||
	singleton.Cron = cron.New(cron.WithSeconds(), cron.WithLocation(shanghai))
 | 
			
		||||
	singleton.Crons = make(map[uint64]*model.Cron)
 | 
			
		||||
	singleton.ServerList = make(map[uint64]*model.Server)
 | 
			
		||||
	singleton.SecretToID = make(map[string]uint64)
 | 
			
		||||
 | 
			
		||||
	err = singleton.Conf.Read("data/config.yaml")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	singleton.DB, err = gorm.Open(sqlite.Open("data/sqlite.db"), &gorm.Config{
 | 
			
		||||
		CreateBatchSize: 200,
 | 
			
		||||
	})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	if singleton.Conf.Debug {
 | 
			
		||||
		singleton.DB = singleton.DB.Debug()
 | 
			
		||||
	}
 | 
			
		||||
	if singleton.Conf.GRPCPort == 0 {
 | 
			
		||||
		singleton.Conf.GRPCPort = 5555
 | 
			
		||||
	}
 | 
			
		||||
	singleton.Cache = cache.New(5*time.Minute, 10*time.Minute)
 | 
			
		||||
 | 
			
		||||
	singleton.InitConfigFromPath("data/config.yaml")
 | 
			
		||||
	singleton.InitDBFromPath("data/sqlite.db")
 | 
			
		||||
	initSystem()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func initSystem() {
 | 
			
		||||
	singleton.DB.AutoMigrate(model.Server{}, model.User{},
 | 
			
		||||
		model.Notification{}, model.AlertRule{}, model.Monitor{},
 | 
			
		||||
		model.MonitorHistory{}, model.Cron{}, model.Transfer{})
 | 
			
		||||
 | 
			
		||||
	singleton.LoadNotifications()
 | 
			
		||||
	loadServers() //加载服务器列表
 | 
			
		||||
	loadCrons()   //加载计划任务
 | 
			
		||||
	// 启动 singleton 包下的所有服务
 | 
			
		||||
	singleton.LoadSingleton()
 | 
			
		||||
 | 
			
		||||
	// 每天的3:30 对 监控记录 和 流量记录 进行清理
 | 
			
		||||
	_, err := singleton.Cron.AddFunc("0 30 3 * * *", cleanMonitorHistory)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
	if _, err := singleton.Cron.AddFunc("0 30 3 * * *", singleton.CleanMonitorHistory); err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 每小时对流量记录进行打点
 | 
			
		||||
	_, err = singleton.Cron.AddFunc("0 0 * * * *", recordTransferHourlyUsage)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
	if _, err := singleton.Cron.AddFunc("0 0 * * * *", singleton.RecordTransferHourlyUsage); err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// recordTransferHourlyUsage 对流量记录进行打点
 | 
			
		||||
func recordTransferHourlyUsage() {
 | 
			
		||||
	singleton.ServerLock.Lock()
 | 
			
		||||
	defer singleton.ServerLock.Unlock()
 | 
			
		||||
	now := time.Now()
 | 
			
		||||
	nowTrimSeconds := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
 | 
			
		||||
	var txs []model.Transfer
 | 
			
		||||
	for id, server := range singleton.ServerList {
 | 
			
		||||
		tx := model.Transfer{
 | 
			
		||||
			ServerID: id,
 | 
			
		||||
			In:       server.State.NetInTransfer - uint64(server.PrevHourlyTransferIn),
 | 
			
		||||
			Out:      server.State.NetOutTransfer - uint64(server.PrevHourlyTransferOut),
 | 
			
		||||
		}
 | 
			
		||||
		if tx.In == 0 && tx.Out == 0 {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		server.PrevHourlyTransferIn = int64(server.State.NetInTransfer)
 | 
			
		||||
		server.PrevHourlyTransferOut = int64(server.State.NetOutTransfer)
 | 
			
		||||
		tx.CreatedAt = nowTrimSeconds
 | 
			
		||||
		txs = append(txs, tx)
 | 
			
		||||
	}
 | 
			
		||||
	if len(txs) == 0 {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	log.Println("NEZHA>> Cron 流量统计入库", len(txs), singleton.DB.Create(txs).Error)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// cleanMonitorHistory 清理无效或过时的 监控记录 和 流量记录
 | 
			
		||||
func cleanMonitorHistory() {
 | 
			
		||||
	// 清理已被删除的服务器的监控记录与流量记录
 | 
			
		||||
	singleton.DB.Unscoped().Delete(&model.MonitorHistory{}, "created_at < ? OR monitor_id NOT IN (SELECT `id` FROM monitors)", time.Now().AddDate(0, 0, -30))
 | 
			
		||||
	singleton.DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (SELECT `id` FROM servers)")
 | 
			
		||||
	// 计算可清理流量记录的时长
 | 
			
		||||
	var allServerKeep time.Time
 | 
			
		||||
	specialServerKeep := make(map[uint64]time.Time)
 | 
			
		||||
	var specialServerIDs []uint64
 | 
			
		||||
	var alerts []model.AlertRule
 | 
			
		||||
	singleton.DB.Find(&alerts)
 | 
			
		||||
	for i := 0; i < len(alerts); i++ {
 | 
			
		||||
		for j := 0; j < len(alerts[i].Rules); j++ {
 | 
			
		||||
			// 是不是流量记录规则
 | 
			
		||||
			if !alerts[i].Rules[j].IsTransferDurationRule() {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			dataCouldRemoveBefore := alerts[i].Rules[j].GetTransferDurationStart()
 | 
			
		||||
			// 判断规则影响的机器范围
 | 
			
		||||
			if alerts[i].Rules[j].Cover == model.RuleCoverAll {
 | 
			
		||||
				// 更新全局可以清理的数据点
 | 
			
		||||
				if allServerKeep.IsZero() || allServerKeep.After(dataCouldRemoveBefore) {
 | 
			
		||||
					allServerKeep = dataCouldRemoveBefore
 | 
			
		||||
				}
 | 
			
		||||
			} else {
 | 
			
		||||
				// 更新特定机器可以清理数据点
 | 
			
		||||
				for id := range alerts[i].Rules[j].Ignore {
 | 
			
		||||
					if specialServerKeep[id].IsZero() || specialServerKeep[id].After(dataCouldRemoveBefore) {
 | 
			
		||||
						specialServerKeep[id] = dataCouldRemoveBefore
 | 
			
		||||
						specialServerIDs = append(specialServerIDs, id)
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for id, couldRemove := range specialServerKeep {
 | 
			
		||||
		singleton.DB.Unscoped().Delete(&model.Transfer{}, "server_id = ? AND created_at < ?", id, couldRemove)
 | 
			
		||||
	}
 | 
			
		||||
	if allServerKeep.IsZero() {
 | 
			
		||||
		singleton.DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (?)", specialServerIDs)
 | 
			
		||||
	} else {
 | 
			
		||||
		singleton.DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (?) AND created_at < ?", specialServerIDs, allServerKeep)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
//loadServers 加载服务器列表并根据ID排序
 | 
			
		||||
func loadServers() {
 | 
			
		||||
	var servers []model.Server
 | 
			
		||||
	singleton.DB.Find(&servers)
 | 
			
		||||
	for _, s := range servers {
 | 
			
		||||
		innerS := s
 | 
			
		||||
		innerS.Host = &model.Host{}
 | 
			
		||||
		innerS.State = &model.HostState{}
 | 
			
		||||
		singleton.ServerList[innerS.ID] = &innerS
 | 
			
		||||
		singleton.SecretToID[innerS.Secret] = innerS.ID
 | 
			
		||||
	}
 | 
			
		||||
	singleton.ReSortServer()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// loadCrons 加载计划任务
 | 
			
		||||
func loadCrons() {
 | 
			
		||||
	var crons []model.Cron
 | 
			
		||||
	singleton.DB.Find(&crons)
 | 
			
		||||
	var err error
 | 
			
		||||
	errMsg := new(bytes.Buffer)
 | 
			
		||||
	for i := 0; i < len(crons); i++ {
 | 
			
		||||
		cr := crons[i]
 | 
			
		||||
 | 
			
		||||
		crIgnoreMap := make(map[uint64]bool)
 | 
			
		||||
		for j := 0; j < len(cr.Servers); j++ {
 | 
			
		||||
			crIgnoreMap[cr.Servers[j]] = true
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// 注册计划任务
 | 
			
		||||
		cr.CronJobID, err = singleton.Cron.AddFunc(cr.Scheduler, singleton.CronTrigger(cr))
 | 
			
		||||
		if err == nil {
 | 
			
		||||
			singleton.Crons[cr.ID] = &cr
 | 
			
		||||
		} else {
 | 
			
		||||
			if errMsg.Len() == 0 {
 | 
			
		||||
				errMsg.WriteString("调度失败的计划任务:[")
 | 
			
		||||
			}
 | 
			
		||||
			errMsg.WriteString(fmt.Sprintf("%d,", cr.ID))
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if errMsg.Len() > 0 {
 | 
			
		||||
		msg := errMsg.String()
 | 
			
		||||
		singleton.SendNotification(msg[:len(msg)-1]+"] 这些任务将无法正常执行,请进入后点重新修改保存。", false)
 | 
			
		||||
	}
 | 
			
		||||
	singleton.Cron.Start()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func main() {
 | 
			
		||||
	cleanMonitorHistory()
 | 
			
		||||
	singleton.CleanMonitorHistory()
 | 
			
		||||
	go rpc.ServeRPC(singleton.Conf.GRPCPort)
 | 
			
		||||
	serviceSentinelDispatchBus := make(chan model.Monitor) // 用于传递服务监控任务信息的channel
 | 
			
		||||
	go rpc.DispatchTask(serviceSentinelDispatchBus)
 | 
			
		||||
@ -207,7 +46,7 @@ func main() {
 | 
			
		||||
		return srv.ListenAndServe()
 | 
			
		||||
	}, func(c context.Context) error {
 | 
			
		||||
		log.Println("NEZHA>> Graceful::START")
 | 
			
		||||
		recordTransferHourlyUsage()
 | 
			
		||||
		singleton.RecordTransferHourlyUsage()
 | 
			
		||||
		log.Println("NEZHA>> Graceful::END")
 | 
			
		||||
		srv.Shutdown(c)
 | 
			
		||||
		return nil
 | 
			
		||||
 | 
			
		||||
@ -26,6 +26,7 @@ type AgentConfig struct {
 | 
			
		||||
	v                           *viper.Viper
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Read 从给定的文件目录加载配置文件
 | 
			
		||||
func (c *AgentConfig) Read(path string) error {
 | 
			
		||||
	c.v = viper.New()
 | 
			
		||||
	c.v.SetConfigFile(path)
 | 
			
		||||
@ -98,6 +99,9 @@ func (c *Config) Read(path string) error {
 | 
			
		||||
	if c.Site.Theme == "" {
 | 
			
		||||
		c.Site.Theme = "default"
 | 
			
		||||
	}
 | 
			
		||||
	if c.GRPCPort == 0 {
 | 
			
		||||
		c.GRPCPort = 5555
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	c.updateIgnoredIPNotificationID()
 | 
			
		||||
	return nil
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										81
									
								
								service/singleton/crontask.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										81
									
								
								service/singleton/crontask.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,81 @@
 | 
			
		||||
package singleton
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"github.com/naiba/nezha/model"
 | 
			
		||||
	pb "github.com/naiba/nezha/proto"
 | 
			
		||||
	"github.com/robfig/cron/v3"
 | 
			
		||||
	"sync"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	Cron     *cron.Cron
 | 
			
		||||
	Crons    map[uint64]*model.Cron
 | 
			
		||||
	CronLock sync.RWMutex
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func InitCronTask() {
 | 
			
		||||
	Cron = cron.New(cron.WithSeconds(), cron.WithLocation(Loc))
 | 
			
		||||
	Crons = make(map[uint64]*model.Cron)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// LoadCronTasks 加载计划任务
 | 
			
		||||
func LoadCronTasks() {
 | 
			
		||||
	InitCronTask()
 | 
			
		||||
	var crons []model.Cron
 | 
			
		||||
	DB.Find(&crons)
 | 
			
		||||
	var err error
 | 
			
		||||
	errMsg := new(bytes.Buffer)
 | 
			
		||||
	for i := 0; i < len(crons); i++ {
 | 
			
		||||
		cr := crons[i]
 | 
			
		||||
 | 
			
		||||
		// 注册计划任务
 | 
			
		||||
		cr.CronJobID, err = Cron.AddFunc(cr.Scheduler, CronTrigger(cr))
 | 
			
		||||
		if err == nil {
 | 
			
		||||
			Crons[cr.ID] = &cr
 | 
			
		||||
		} else {
 | 
			
		||||
			if errMsg.Len() == 0 {
 | 
			
		||||
				errMsg.WriteString("调度失败的计划任务:[")
 | 
			
		||||
			}
 | 
			
		||||
			errMsg.WriteString(fmt.Sprintf("%d,", cr.ID))
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if errMsg.Len() > 0 {
 | 
			
		||||
		msg := errMsg.String()
 | 
			
		||||
		SendNotification(msg[:len(msg)-1]+"] 这些任务将无法正常执行,请进入后点重新修改保存。", false)
 | 
			
		||||
	}
 | 
			
		||||
	Cron.Start()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func ManualTrigger(c model.Cron) {
 | 
			
		||||
	CronTrigger(c)()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func CronTrigger(cr model.Cron) func() {
 | 
			
		||||
	crIgnoreMap := make(map[uint64]bool)
 | 
			
		||||
	for j := 0; j < len(cr.Servers); j++ {
 | 
			
		||||
		crIgnoreMap[cr.Servers[j]] = true
 | 
			
		||||
	}
 | 
			
		||||
	return func() {
 | 
			
		||||
		ServerLock.RLock()
 | 
			
		||||
		defer ServerLock.RUnlock()
 | 
			
		||||
		for _, s := range ServerList {
 | 
			
		||||
			if cr.Cover == model.CronCoverAll && crIgnoreMap[s.ID] {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			if cr.Cover == model.CronCoverIgnoreAll && !crIgnoreMap[s.ID] {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			if s.TaskStream != nil {
 | 
			
		||||
				s.TaskStream.Send(&pb.Task{
 | 
			
		||||
					Id:   cr.ID,
 | 
			
		||||
					Data: cr.Command,
 | 
			
		||||
					Type: model.TaskTypeCommand,
 | 
			
		||||
				})
 | 
			
		||||
			} else {
 | 
			
		||||
				SendNotification(fmt.Sprintf("[任务失败] %s,服务器 %s 离线,无法执行。", cr.Name, s.Name), false)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@ -16,7 +16,7 @@ const firstNotificationDelay = time.Minute * 15
 | 
			
		||||
var notifications []model.Notification
 | 
			
		||||
var notificationsLock sync.RWMutex
 | 
			
		||||
 | 
			
		||||
// LoadNotifications 加载通知方式到 singleton.notifications 变量
 | 
			
		||||
// LoadNotifications 从 DB 加载通知方式到 singleton.notifications 变量
 | 
			
		||||
func LoadNotifications() {
 | 
			
		||||
	notificationsLock.Lock()
 | 
			
		||||
	if err := DB.Find(¬ifications).Error; err != nil {
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										58
									
								
								service/singleton/server.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										58
									
								
								service/singleton/server.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,58 @@
 | 
			
		||||
package singleton
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"github.com/naiba/nezha/model"
 | 
			
		||||
	"sort"
 | 
			
		||||
	"sync"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	ServerList map[uint64]*model.Server // [ServerID] -> model.Server
 | 
			
		||||
	SecretToID map[string]uint64        // [ServerSecret] -> ServerID
 | 
			
		||||
	ServerLock sync.RWMutex
 | 
			
		||||
 | 
			
		||||
	SortedServerList []*model.Server // 用于存储服务器列表的 slice,按照服务器 ID 排序
 | 
			
		||||
	SortedServerLock sync.RWMutex
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// InitServer 初始化 ServerID <-> Secret 的映射
 | 
			
		||||
func InitServer() {
 | 
			
		||||
	ServerList = make(map[uint64]*model.Server)
 | 
			
		||||
	SecretToID = make(map[string]uint64)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
//LoadServers 加载服务器列表并根据ID排序
 | 
			
		||||
func LoadServers() {
 | 
			
		||||
	InitServer()
 | 
			
		||||
	var servers []model.Server
 | 
			
		||||
	DB.Find(&servers)
 | 
			
		||||
	for _, s := range servers {
 | 
			
		||||
		innerS := s
 | 
			
		||||
		innerS.Host = &model.Host{}
 | 
			
		||||
		innerS.State = &model.HostState{}
 | 
			
		||||
		ServerList[innerS.ID] = &innerS
 | 
			
		||||
		SecretToID[innerS.Secret] = innerS.ID
 | 
			
		||||
	}
 | 
			
		||||
	ReSortServer()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ReSortServer 根据服务器ID 对服务器列表进行排序(ID越大越靠前)
 | 
			
		||||
func ReSortServer() {
 | 
			
		||||
	ServerLock.RLock()
 | 
			
		||||
	defer ServerLock.RUnlock()
 | 
			
		||||
	SortedServerLock.Lock()
 | 
			
		||||
	defer SortedServerLock.Unlock()
 | 
			
		||||
 | 
			
		||||
	SortedServerList = []*model.Server{}
 | 
			
		||||
	for _, s := range ServerList {
 | 
			
		||||
		SortedServerList = append(SortedServerList, s)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// 按照服务器 ID 排序的具体实现(ID越大越靠前)
 | 
			
		||||
	sort.SliceStable(SortedServerList, func(i, j int) bool {
 | 
			
		||||
		if SortedServerList[i].DisplayIndex == SortedServerList[j].DisplayIndex {
 | 
			
		||||
			return SortedServerList[i].ID < SortedServerList[j].ID
 | 
			
		||||
		}
 | 
			
		||||
		return SortedServerList[i].DisplayIndex > SortedServerList[j].DisplayIndex
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
@ -149,7 +149,6 @@ func (ss *ServiceSentinel) loadMonitorHistory() {
 | 
			
		||||
	var err error
 | 
			
		||||
	ss.monitorsLock.Lock()
 | 
			
		||||
	defer ss.monitorsLock.Unlock()
 | 
			
		||||
	ss.monitors = make(map[uint64]*model.Monitor)
 | 
			
		||||
	for i := 0; i < len(monitors); i++ {
 | 
			
		||||
		task := *monitors[i]
 | 
			
		||||
		// 通过cron定时将服务监控任务传递给任务调度管道
 | 
			
		||||
 | 
			
		||||
@ -1,18 +1,13 @@
 | 
			
		||||
package singleton
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"sort"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"gorm.io/driver/sqlite"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/patrickmn/go-cache"
 | 
			
		||||
	"github.com/robfig/cron/v3"
 | 
			
		||||
	"gorm.io/gorm"
 | 
			
		||||
 | 
			
		||||
	"github.com/naiba/nezha/model"
 | 
			
		||||
	"github.com/naiba/nezha/pkg/utils"
 | 
			
		||||
	pb "github.com/naiba/nezha/proto"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var Version = "v0.12.18" // !!记得修改 README 中的 badge 版本!!
 | 
			
		||||
@ -22,86 +17,52 @@ var (
 | 
			
		||||
	Cache *cache.Cache
 | 
			
		||||
	DB    *gorm.DB
 | 
			
		||||
	Loc   *time.Location
 | 
			
		||||
 | 
			
		||||
	ServerList map[uint64]*model.Server // [ServerID] -> model.Server
 | 
			
		||||
	SecretToID map[string]uint64        // [ServerSecret] -> ServerID
 | 
			
		||||
	ServerLock sync.RWMutex
 | 
			
		||||
 | 
			
		||||
	SortedServerList []*model.Server // 用于存储服务器列表的 slice,按照服务器 ID 排序
 | 
			
		||||
	SortedServerLock sync.RWMutex
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Init 初始化时区为上海时区
 | 
			
		||||
// Init 初始化singleton
 | 
			
		||||
func Init() {
 | 
			
		||||
	// 初始化时区至上海 UTF+8
 | 
			
		||||
	var err error
 | 
			
		||||
	Loc, err = time.LoadLocation("Asia/Shanghai")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	Conf = &model.Config{}
 | 
			
		||||
	Cache = cache.New(5*time.Minute, 10*time.Minute)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ReSortServer 根据服务器ID 对服务器列表进行排序(ID越大越靠前)
 | 
			
		||||
func ReSortServer() {
 | 
			
		||||
	ServerLock.RLock()
 | 
			
		||||
	defer ServerLock.RUnlock()
 | 
			
		||||
	SortedServerLock.Lock()
 | 
			
		||||
	defer SortedServerLock.Unlock()
 | 
			
		||||
// LoadSingleton 加载子服务并执行
 | 
			
		||||
func LoadSingleton() {
 | 
			
		||||
	LoadNotifications() // 加载通知服务
 | 
			
		||||
	LoadServers()       // 加载服务器列表
 | 
			
		||||
	LoadCronTasks()     // 加载定时任务
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
	SortedServerList = []*model.Server{}
 | 
			
		||||
	for _, s := range ServerList {
 | 
			
		||||
		SortedServerList = append(SortedServerList, s)
 | 
			
		||||
// InitConfigFromPath 从给出的文件路径中加载配置
 | 
			
		||||
func InitConfigFromPath(path string) {
 | 
			
		||||
	err := Conf.Read(path)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
	// 按照服务器 ID 排序的具体实现(ID越大越靠前)
 | 
			
		||||
	sort.SliceStable(SortedServerList, func(i, j int) bool {
 | 
			
		||||
		if SortedServerList[i].DisplayIndex == SortedServerList[j].DisplayIndex {
 | 
			
		||||
			return SortedServerList[i].ID < SortedServerList[j].ID
 | 
			
		||||
		}
 | 
			
		||||
		return SortedServerList[i].DisplayIndex > SortedServerList[j].DisplayIndex
 | 
			
		||||
// InitDBFromPath 从给出的文件路径中加载数据库
 | 
			
		||||
func InitDBFromPath(path string) {
 | 
			
		||||
	var err error
 | 
			
		||||
	DB, err = gorm.Open(sqlite.Open(path), &gorm.Config{
 | 
			
		||||
		CreateBatchSize: 200,
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// =============== Cron Mixin ===============
 | 
			
		||||
 | 
			
		||||
var CronLock sync.RWMutex
 | 
			
		||||
var Crons map[uint64]*model.Cron
 | 
			
		||||
var Cron *cron.Cron
 | 
			
		||||
 | 
			
		||||
func ManualTrigger(c model.Cron) {
 | 
			
		||||
	CronTrigger(c)()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func CronTrigger(cr model.Cron) func() {
 | 
			
		||||
	crIgnoreMap := make(map[uint64]bool)
 | 
			
		||||
	for j := 0; j < len(cr.Servers); j++ {
 | 
			
		||||
		crIgnoreMap[cr.Servers[j]] = true
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	return func() {
 | 
			
		||||
		ServerLock.RLock()
 | 
			
		||||
		defer ServerLock.RUnlock()
 | 
			
		||||
		for _, s := range ServerList {
 | 
			
		||||
			if cr.Cover == model.CronCoverAll && crIgnoreMap[s.ID] {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			if cr.Cover == model.CronCoverIgnoreAll && !crIgnoreMap[s.ID] {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			if s.TaskStream != nil {
 | 
			
		||||
				s.TaskStream.Send(&pb.Task{
 | 
			
		||||
					Id:   cr.ID,
 | 
			
		||||
					Data: cr.Command,
 | 
			
		||||
					Type: model.TaskTypeCommand,
 | 
			
		||||
				})
 | 
			
		||||
			} else {
 | 
			
		||||
				SendNotification(fmt.Sprintf("[任务失败] %s,服务器 %s 离线,无法执行。", cr.Name, s.Name), false)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	if Conf.Debug {
 | 
			
		||||
		DB = DB.Debug()
 | 
			
		||||
	}
 | 
			
		||||
	err = DB.AutoMigrate(model.Server{}, model.User{},
 | 
			
		||||
		model.Notification{}, model.AlertRule{}, model.Monitor{},
 | 
			
		||||
		model.MonitorHistory{}, model.Cron{}, model.Transfer{})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func IPDesensitize(ip string) string {
 | 
			
		||||
	if Conf.EnablePlainIPInNotification {
 | 
			
		||||
		return ip
 | 
			
		||||
	}
 | 
			
		||||
	return utils.IPDesensitize(ip)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										95
									
								
								service/singleton/toolfunc.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										95
									
								
								service/singleton/toolfunc.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,95 @@
 | 
			
		||||
package singleton
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"github.com/naiba/nezha/model"
 | 
			
		||||
	"github.com/naiba/nezha/pkg/utils"
 | 
			
		||||
	"log"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
	该文件保存了一些工具函数
 | 
			
		||||
	RecordTransferHourlyUsage	对流量记录进行打点
 | 
			
		||||
	CleanMonitorHistory			清理无效或过时的 监控记录 和 流量记录
 | 
			
		||||
	IPDesensitize				根据用户设置选择是否对IP进行打码处理 返回处理后的IP(关闭打码则返回原IP)
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
// RecordTransferHourlyUsage 对流量记录进行打点
 | 
			
		||||
func RecordTransferHourlyUsage() {
 | 
			
		||||
	ServerLock.Lock()
 | 
			
		||||
	defer ServerLock.Unlock()
 | 
			
		||||
	now := time.Now()
 | 
			
		||||
	nowTrimSeconds := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
 | 
			
		||||
	var txs []model.Transfer
 | 
			
		||||
	for id, server := range ServerList {
 | 
			
		||||
		tx := model.Transfer{
 | 
			
		||||
			ServerID: id,
 | 
			
		||||
			In:       server.State.NetInTransfer - uint64(server.PrevHourlyTransferIn),
 | 
			
		||||
			Out:      server.State.NetOutTransfer - uint64(server.PrevHourlyTransferOut),
 | 
			
		||||
		}
 | 
			
		||||
		if tx.In == 0 && tx.Out == 0 {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		server.PrevHourlyTransferIn = int64(server.State.NetInTransfer)
 | 
			
		||||
		server.PrevHourlyTransferOut = int64(server.State.NetOutTransfer)
 | 
			
		||||
		tx.CreatedAt = nowTrimSeconds
 | 
			
		||||
		txs = append(txs, tx)
 | 
			
		||||
	}
 | 
			
		||||
	if len(txs) == 0 {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	log.Println("NEZHA>> Cron 流量统计入库", len(txs), DB.Create(txs).Error)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// CleanMonitorHistory 清理无效或过时的 监控记录 和 流量记录
 | 
			
		||||
func CleanMonitorHistory() {
 | 
			
		||||
	// 清理已被删除的服务器的监控记录与流量记录
 | 
			
		||||
	DB.Unscoped().Delete(&model.MonitorHistory{}, "created_at < ? OR monitor_id NOT IN (SELECT `id` FROM monitors)", time.Now().AddDate(0, 0, -30))
 | 
			
		||||
	DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (SELECT `id` FROM servers)")
 | 
			
		||||
	// 计算可清理流量记录的时长
 | 
			
		||||
	var allServerKeep time.Time
 | 
			
		||||
	specialServerKeep := make(map[uint64]time.Time)
 | 
			
		||||
	var specialServerIDs []uint64
 | 
			
		||||
	var alerts []model.AlertRule
 | 
			
		||||
	DB.Find(&alerts)
 | 
			
		||||
	for i := 0; i < len(alerts); i++ {
 | 
			
		||||
		for j := 0; j < len(alerts[i].Rules); j++ {
 | 
			
		||||
			// 是不是流量记录规则
 | 
			
		||||
			if !alerts[i].Rules[j].IsTransferDurationRule() {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			dataCouldRemoveBefore := alerts[i].Rules[j].GetTransferDurationStart()
 | 
			
		||||
			// 判断规则影响的机器范围
 | 
			
		||||
			if alerts[i].Rules[j].Cover == model.RuleCoverAll {
 | 
			
		||||
				// 更新全局可以清理的数据点
 | 
			
		||||
				if allServerKeep.IsZero() || allServerKeep.After(dataCouldRemoveBefore) {
 | 
			
		||||
					allServerKeep = dataCouldRemoveBefore
 | 
			
		||||
				}
 | 
			
		||||
			} else {
 | 
			
		||||
				// 更新特定机器可以清理数据点
 | 
			
		||||
				for id := range alerts[i].Rules[j].Ignore {
 | 
			
		||||
					if specialServerKeep[id].IsZero() || specialServerKeep[id].After(dataCouldRemoveBefore) {
 | 
			
		||||
						specialServerKeep[id] = dataCouldRemoveBefore
 | 
			
		||||
						specialServerIDs = append(specialServerIDs, id)
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for id, couldRemove := range specialServerKeep {
 | 
			
		||||
		DB.Unscoped().Delete(&model.Transfer{}, "server_id = ? AND created_at < ?", id, couldRemove)
 | 
			
		||||
	}
 | 
			
		||||
	if allServerKeep.IsZero() {
 | 
			
		||||
		DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (?)", specialServerIDs)
 | 
			
		||||
	} else {
 | 
			
		||||
		DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (?) AND created_at < ?", specialServerIDs, allServerKeep)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// IPDesensitize 根据设置选择是否对IP进行打码处理 返回处理后的IP(关闭打码则返回原IP)
 | 
			
		||||
func IPDesensitize(ip string) string {
 | 
			
		||||
	if Conf.EnablePlainIPInNotification {
 | 
			
		||||
		return ip
 | 
			
		||||
	}
 | 
			
		||||
	return utils.IPDesensitize(ip)
 | 
			
		||||
}
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user