lug 是 sjtug mirror 最为核心一个组件。其主要功能是定时跑脚本;功能很简单,代码量不少,结构很是清晰。
├── Dockerfile
├── LICENSE
├── README.md
├── cli/lug
│ ├── license.go
│ └── main.go
├── go.mod
├── go.sum
└── pkg
├── config
│ └── config.go
├── exporter
│ └── exporter.go
├── helper
│ ├── disk_usage.go
│ └── max_length_slice.go
├── manager
│ ├── json_rest.go
│ └── manager.go
└── worker
├── executor.go
├── executor_invoke_worker.go
├── external_worker.go
├── shell_script_executor.go
├── utilities.go
├── utility_rlimit.go
└── worker.go
先从入口开始吧。入口位于 cli/lug,licence.go
由 scripts/gen_license.sh
(从被 git ignore 的 vendor/
)生成,看起来像是圣遗物…主要的还是 main.go
。
命令行解析用了 pflag。
日志用了 logrus 作为日志库, logrustash 作为兼容 Logstash 的日志 hook,从而为 Grafana 所用。它通过 cfg.LogStashConfig
配置文件项通过 prepareLogger
函数启动。
命令行解析、配置解析、日志启动的部分放在了 init
里,按下不表。
主函数就直接起服务了:
import (
"net/http"
"github.com/sjtug/lug/pkg/config"
"github.com/sjtug/lug/pkg/exporter"
"github.com/sjtug/lug/pkg/manager"
)
func main() {
m, err := manager.NewManager(&cfg)
if err != nil {
panic(err)
}
jsonapi := manager.NewRestfulAPI(m)
handler := jsonapi.GetAPIHandler()
go http.ListenAndServe(cfg.JsonAPIConfig.Address, handler)
go exporter.Expose(cfg.ExporterAddr)
m.Run()
}
可以看到服务兵分三路:一个 jsonapi,一个 exporter,一个 manager。(其实还有个 log)
jsonapi 提供了一个 restful 接口进行一些操作:
// pkg/manager/json_rest.go
router, err := rest.MakeRouter(
rest.Get("/lug/v1/admin/manager/detail", r.getManagerStatusDetail),
rest.Get("/lug/v1/manager/summary", r.getManagerStatusSummary),
rest.Post("/lug/v1/admin/manager/start", r.startManager),
rest.Post("/lug/v1/admin/manager/stop", r.stopManager),
rest.Delete("/lug/v1/admin/manager", r.exitManager),
)
没有鉴权,只能防火墙内用;估计现在也没实装。希望未来可视化控制面板可以支持!
exporter 在 {cfg.ExporterAddr}/metrics
下提供了一些 prometheus 风格统计数据接口供 Grafana 使用。以后再研究。
manager 是功能核心。从类型定义就可以了解大概
type Manager struct {
config *config.Config // main config
workers []worker.Worker // sync items
workersLastInvokeTime map[string]time.Time
controlChan chan int // extern control signal
finishChan chan int // finish signal
running bool
pendingQueue []int // storing index of worker to launch
logger *logrus.Entry
}
大致上是伪 worker 结构,每个 worker 实际上就是一个脚本的配置项,既不是协程独立控制一条配置的同步,也不是少量 worker 从 task queue 取的结构,而是相当奇怪的架构。(可能有什么神奇的考虑吧)
直接来看 m.Run
(暂时忽略 checkpoint 和 log 相关部分):
func (m *Manager) Run() {
c := time.Tick(time.Duration(m.config.Interval) * time.Second)
for _, w := range m.workers {
go w.RunSync()
}
for {
// wait until config.Interval seconds has elapsed
select {
case <-c:
if m.running {
running := 0
for i, w := range m.workers {
wStatus := w.GetStatus()
if !wStatus.Idle {
running++
continue
}
wConfig := w.GetConfig()
wTime := m.workersLastInvokeTime[wConfig["name"].(string)]
elapsed := time.Since(wTime)
sec2sync, ok := wConfig["interval"].(int)
if !ok {
sec2sync = 31536000 // if "interval" is not specified, then worker will launch once a year
}
time2sync := time.Duration(sec2sync)*time.Second
if !m.isAlreadyInPendingQueue(i) && elapsed > time2sync {
m.pendingQueue = append(m.pendingQueue, i)
}
}
max_allowed := m.config.ConcurrentLimit - running
new_idx := min(len(m.pendingQueue), max_allowed)
to_launch := m.pendingQueue[:new_idx]
m.pendingQueue = m.pendingQueue[new_idx:]
for _, w_idx := range to_launch {
w := m.workers[w_idx]
wConfig := w.GetConfig()
m.workersLastInvokeTime[wConfig["name"].(string)] = time.Now()
w.TriggerSync()
}
}
case sig, ok := <-m.controlChan:
if ok {
switch sig {
case SigStart:
m.running = true
m.finishChan <- StartFinish
case SigStop:
m.running = false
m.finishChan <- StopFinish
case SigExit:
goto END_OF_FINISH
}
}
}
}
END_OF_FINISH:
m.finishChan <- ExitFinish
}
用一个 pendingQueue
slice 储存了同步队列,一个循环只处理 ConcurrentLimit
以内的数目,剩下的下个循环(config.Interval
后)再处理。
非常期待以后重构成真 worker + channel 分配任务架构!
Worker 是实际执行任务的 interface。
// Worker declares interface for workers using diffenent ways of sync.
type Worker interface {
// This call should be thread-safe
GetStatus() Status
// This should block forever
RunSync()
// This call should be thread-safe
TriggerSync()
GetConfig() config.RepoConfig
}
以前有两种 worker: script 和 rsync(external 是占位的,看起来仅供测试使用);早在 PR#50 rsync worker 就被移除了,因此现在只有一个 shell_script worker;而且它在 PR#69 被拆分成了 ShellScriptExecuter
和 ExecutorInvokeWorker
。
WIP
refactor:
- 唯一 worker 类型
feat:
- 真·worker
- enable restapi
- graceful exit
- more monitor item
- enable UpdateDiskUsage (目前没用)
- 进度条