lug 是 sjtug mirror 最为核心一个组件。其主要功能是定时跑脚本;功能很简单,代码量不少,结构很是清晰。

├── Dockerfile
├── LICENSE
├── README.md
├── cli/lug
│   ├── license.go
│   └── main.go
├── go.mod
├── go.sum
└── pkg
    ├── config
    │   └── config.go
    ├── exporter
    │   └── exporter.go
    ├── helper
    │   ├── disk_usage.go
    │   └── max_length_slice.go
    ├── manager
    │   ├── json_rest.go
    │   └── manager.go
    └── worker
        ├── executor.go
        ├── executor_invoke_worker.go
        ├── external_worker.go
        ├── shell_script_executor.go
        ├── utilities.go
        ├── utility_rlimit.go
        └── worker.go

先从入口开始吧。入口位于 cli/lug,licence.goscripts/gen_license.sh (从被 git ignore 的 vendor/)生成,看起来像是圣遗物…主要的还是 main.go

命令行解析用了 pflag

日志用了 logrus 作为日志库, logrustash 作为兼容 Logstash 的日志 hook,从而为 Grafana 所用。它通过 cfg.LogStashConfig 配置文件项通过 prepareLogger 函数启动。

命令行解析、配置解析、日志启动的部分放在了 init 里,按下不表。

主函数就直接起服务了:

import (
	"net/http"
	
	"github.com/sjtug/lug/pkg/config"
	"github.com/sjtug/lug/pkg/exporter"
	"github.com/sjtug/lug/pkg/manager"
)
func main() {
	m, err := manager.NewManager(&cfg)
	if err != nil {
		panic(err)
	}
	jsonapi := manager.NewRestfulAPI(m)
	handler := jsonapi.GetAPIHandler()
	go http.ListenAndServe(cfg.JsonAPIConfig.Address, handler)
	go exporter.Expose(cfg.ExporterAddr)
	m.Run()
}

可以看到服务兵分三路:一个 jsonapi,一个 exporter,一个 manager。(其实还有个 log)


jsonapi 提供了一个 restful 接口进行一些操作:


// pkg/manager/json_rest.go
router, err := rest.MakeRouter(
	rest.Get("/lug/v1/admin/manager/detail", r.getManagerStatusDetail),
	rest.Get("/lug/v1/manager/summary", r.getManagerStatusSummary),
	rest.Post("/lug/v1/admin/manager/start", r.startManager),
	rest.Post("/lug/v1/admin/manager/stop", r.stopManager),
	rest.Delete("/lug/v1/admin/manager", r.exitManager),
)

没有鉴权,只能防火墙内用;估计现在也没实装。希望未来可视化控制面板可以支持!


exporter 在 {cfg.ExporterAddr}/metrics 下提供了一些 prometheus 风格统计数据接口供 Grafana 使用。以后再研究。


manager 是功能核心。从类型定义就可以了解大概

type Manager struct {
	config                *config.Config // main config
	workers               []worker.Worker // sync items
	workersLastInvokeTime map[string]time.Time
	controlChan           chan int // extern control signal
	finishChan            chan int // finish signal
	running               bool
	pendingQueue          []int // storing index of worker to launch
	logger                *logrus.Entry
}

大致上是伪 worker 结构,每个 worker 实际上就是一个脚本的配置项,既不是协程独立控制一条配置的同步,也不是少量 worker 从 task queue 取的结构,而是相当奇怪的架构。(可能有什么神奇的考虑吧)

直接来看 m.Run (暂时忽略 checkpoint 和 log 相关部分):

func (m *Manager) Run() {
	c := time.Tick(time.Duration(m.config.Interval) * time.Second)
	for _, w := range m.workers {
		go w.RunSync()
	}
	for {
		// wait until config.Interval seconds has elapsed
		select {
		case <-c:
			if m.running {
				running := 0
				for i, w := range m.workers {
					wStatus := w.GetStatus()
					if !wStatus.Idle {
						running++
						continue
					}
					wConfig := w.GetConfig()
					wTime := m.workersLastInvokeTime[wConfig["name"].(string)]
					elapsed := time.Since(wTime)
					sec2sync, ok := wConfig["interval"].(int)
					if !ok {
						sec2sync = 31536000 // if "interval" is not specified, then worker will launch once a year
					}
					time2sync := time.Duration(sec2sync)*time.Second
					if !m.isAlreadyInPendingQueue(i) && elapsed > time2sync {
						m.pendingQueue = append(m.pendingQueue, i)
					}
				}
				max_allowed := m.config.ConcurrentLimit - running
				new_idx := min(len(m.pendingQueue), max_allowed)
				to_launch := m.pendingQueue[:new_idx]
				m.pendingQueue = m.pendingQueue[new_idx:]
			
				for _, w_idx := range to_launch {
					w := m.workers[w_idx]
					wConfig := w.GetConfig()
					m.workersLastInvokeTime[wConfig["name"].(string)] = time.Now()
					w.TriggerSync()
				}
			}
		case sig, ok := <-m.controlChan:
			if ok {
				switch sig {
				case SigStart:
					m.running = true
					m.finishChan <- StartFinish
				case SigStop:
					m.running = false
					m.finishChan <- StopFinish
				case SigExit:
					goto END_OF_FINISH
				}
			}
		}
	}
END_OF_FINISH:
	m.finishChan <- ExitFinish
}

用一个 pendingQueue slice 储存了同步队列,一个循环只处理 ConcurrentLimit 以内的数目,剩下的下个循环(config.Interval 后)再处理。

非常期待以后重构成真 worker + channel 分配任务架构!


Worker 是实际执行任务的 interface。

// Worker declares interface for workers using diffenent ways of sync.
type Worker interface {
	// This call should be thread-safe
	GetStatus() Status
	// This should block forever
	RunSync()
	// This call should be thread-safe
	TriggerSync()

	GetConfig() config.RepoConfig
}

以前有两种 worker: script 和 rsync(external 是占位的,看起来仅供测试使用);早在 PR#50 rsync worker 就被移除了,因此现在只有一个 shell_script worker;而且它在 PR#69 被拆分成了 ShellScriptExecuterExecutorInvokeWorker

WIP

refactor:

  • 唯一 worker 类型

feat:

  • 真·worker
  • enable restapi
  • graceful exit
  • more monitor item
  • enable UpdateDiskUsage (目前没用)
  • 进度条