-
Notifications
You must be signed in to change notification settings - Fork 65
/
main.go
127 lines (103 loc) · 4.62 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package main
import (
"log"
"strings"
"time"
"github.com/box/kube-applier/applylist"
"github.com/box/kube-applier/git"
"github.com/box/kube-applier/kube"
"github.com/box/kube-applier/metrics"
"github.com/box/kube-applier/run"
"github.com/box/kube-applier/sysutil"
"github.com/box/kube-applier/webserver"
)
const (
// Default number of seconds to wait before checking the Git repo for new commits.
defaultPollIntervalSeconds = 5
// Default number of seconds to wait in between apply runs (if no new commits to the repo have been made).
defaultFullRunIntervalSeconds = 5 * 60
// Number of seconds to wait in between attempts to locate the repo at the specified path.
// Git-sync atomically places the repo at the specified path once it is finished pulling, so it will not be present immediately.
waitForRepoInterval = 1 * time.Second
)
func main() {
repoPath := sysutil.GetRequiredEnvString("REPO_PATH")
listenPort := sysutil.GetRequiredEnvInt("LISTEN_PORT")
server := sysutil.GetEnvStringOrDefault("SERVER", "")
blacklistPath := sysutil.GetEnvStringOrDefault("BLACKLIST_PATH", "")
logLevel := sysutil.GetEnvIntOrDefault("LOG_LEVEL", -1)
// A file that contains a list of files to consider for application.
// If the env var is not defined or if the file is empty act like a no-op and
// all files will be considered.
whitelistPath := sysutil.GetEnvStringOrDefault("WHITELIST_PATH", "")
diffURLFormat := sysutil.GetEnvStringOrDefault("DIFF_URL_FORMAT", "")
pollInterval := time.Duration(sysutil.GetEnvIntOrDefault("POLL_INTERVAL_SECONDS", defaultPollIntervalSeconds)) * time.Second
fullRunInterval := time.Duration(sysutil.GetEnvIntOrDefault("FULL_RUN_INTERVAL_SECONDS", defaultFullRunIntervalSeconds)) * time.Second
if diffURLFormat != "" && !strings.Contains(diffURLFormat, "%s") {
log.Fatalf("Invalid DIFF_URL_FORMAT, must contain %q: %v", "%s", diffURLFormat)
}
clock := &sysutil.Clock{}
if err := sysutil.WaitForDir(repoPath, clock, waitForRepoInterval); err != nil {
log.Fatal(err)
}
kubeClient := &kube.Client{
Server: server,
LogLevel: logLevel,
}
kubeClient.Configure()
gitUtil := &git.GitUtil{repoPath}
fileSystem := &sysutil.FileSystem{}
listFactory := &applylist.Factory{repoPath, blacklistPath, whitelistPath, fileSystem}
// Webserver and scheduler send run requests to FullRunQueue channel.
// Runner receives the requests and initiates full runs.
// Only 1 pending request may sit in the queue at a time.
fullRunQueue := make(chan bool, 1)
// When a new Git commit comes in, scheduler sends the commit hash to QuickRunQueue channel.
// Runner receives the hash and initiates a quick run, using the hash for a diff.
// Only 1 pending request may sit in the queue at a time.
quickRunQueue := make(chan string, 1)
// Runner sends run results to runResults channel, webserver receives the results and displays them.
// Limit of 5 is arbitrary - there is significant delay between sends, and receives are handled near instantaneously.
runResults := make(chan run.Result, 5)
// Runner sends run results to runMetrics channel, metrics handler receives the results and updates its metrics.
// Limit of 5 is arbitrary - there is significant delay between sends, and receives are handled hear instantaneously.
runMetrics := make(chan run.Result, 5)
// Runner, webserver, and scheduler all send fatal errors to errors channel, and main() exits upon receiving an error.
// No limit needed, as a single fatal error will exit the program anyway.
errors := make(chan error)
// runCount keeps a count of total runs and used as a run ID for logging purposes.
// Implementing as an unbuffered channel allows for blocking on both sides.
// The counter will block on incrementing until some run pops the current count.
// The runner will block on popping the current count until it is updated.
runCount := make(chan int)
metrics := &metrics.Prometheus{RunMetrics: runMetrics}
metrics.Configure()
batchApplier := &run.BatchApplier{kubeClient}
pollTicker := time.Tick(pollInterval)
fullRunTicker := time.Tick(fullRunInterval)
runner := &run.Runner{
batchApplier,
listFactory,
gitUtil,
clock,
diffURLFormat,
"",
quickRunQueue,
fullRunQueue,
runResults,
runMetrics,
errors,
runCount,
}
scheduler := &run.Scheduler{gitUtil, pollTicker, fullRunTicker, quickRunQueue, fullRunQueue, errors, ""}
webserver := &webserver.WebServer{listenPort, clock, metrics.GetHandler(), fullRunQueue, runResults, errors}
go metrics.StartMetricsLoop()
go scheduler.Start()
go runner.StartRunCounter()
go runner.StartQuickLoop()
go runner.StartFullLoop()
go webserver.Start()
for err := range errors {
log.Fatal(err)
}
}