Skip to content

Commit 248194c

Browse files
committed
Make port preflight checks idempotent
1 parent 0d82c35 commit 248194c

File tree

8 files changed

+101
-51
lines changed

8 files changed

+101
-51
lines changed

integration/install_test.go

+8
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,10 @@ var _ = Describe("kismatic", func() {
270270
sub.It("nodes should contain expected component overrides", func() error {
271271
return ContainsOverrides(nodes, sshKey)
272272
})
273+
274+
sub.It("should allow for running preflight checks idempotently", func() error {
275+
return runValidate("kismatic-testing.yaml")
276+
})
273277
})
274278
})
275279
})
@@ -324,6 +328,10 @@ var _ = Describe("kismatic", func() {
324328
sub.It("nodes should contain expected labels", func() error {
325329
return containsLabels(nodes, sshKey)
326330
})
331+
332+
sub.It("should allow for running preflight checks idempotently", func() error {
333+
return runValidate("kismatic-testing.yaml")
334+
})
327335
})
328336
})
329337
})

integration/validate.go

+10-12
Original file line numberDiff line numberDiff line change
@@ -102,10 +102,7 @@ func ValidateKismaticMini(node NodeDeets, user, sshKey string) PlanAWS {
102102

103103
// Run validation
104104
By("Validate our plan")
105-
ver := exec.Command("./kismatic", "install", "validate", "-f", f.Name())
106-
ver.Stdout = os.Stdout
107-
ver.Stderr = os.Stderr
108-
err = ver.Run()
105+
err = runValidate(f.Name())
109106
FailIfError(err, "Error validating plan")
110107
return plan
111108
}
@@ -141,10 +138,7 @@ func ValidateKismaticMiniDenyPkgInstallation(node NodeDeets, sshUser, sshKey str
141138

142139
// Run validation
143140
By("Validate our plan")
144-
cmd := exec.Command("./kismatic", "install", "validate", "-f", f.Name())
145-
cmd.Stdout = os.Stdout
146-
cmd.Stderr = os.Stderr
147-
return cmd.Run()
141+
return runValidate(f.Name())
148142
}
149143

150144
func ValidateKismaticMiniWithBadSSH(node NodeDeets, user, sshKey string) PlanAWS {
@@ -175,10 +169,7 @@ func ValidateKismaticMiniWithBadSSH(node NodeDeets, user, sshKey string) PlanAWS
175169

176170
// Run validation
177171
By("Validate our plan")
178-
ver := exec.Command("./kismatic", "install", "validate", "-f", f.Name())
179-
ver.Stdout = os.Stdout
180-
ver.Stderr = os.Stderr
181-
err = ver.Run()
172+
err = runValidate(f.Name())
182173
FailIfSuccess(err)
183174
return plan
184175
}
@@ -196,3 +187,10 @@ func getBadSSHKeyFile() (string, error) {
196187

197188
return filepath.Join(dir, ".ssh", "bad.pem"), nil
198189
}
190+
191+
func runValidate(planFile string) error {
192+
cmd := exec.Command("./kismatic", "install", "validate", "-f", planFile)
193+
cmd.Stdout = os.Stdout
194+
cmd.Stderr = os.Stderr
195+
return cmd.Run()
196+
}

pkg/inspector/check/tcp.go

+40-24
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
package check
22

33
import (
4-
"bufio"
5-
"errors"
64
"fmt"
75
"io"
86
"log"
97
"net"
8+
"os/exec"
109
"strings"
1110
"time"
1211
)
@@ -34,41 +33,29 @@ func (c *TCPPortClientCheck) Check() (bool, error) {
3433
if err != nil {
3534
return false, fmt.Errorf("Port %d on host %q is unreachable. Error was: %v", c.PortNumber, c.IPAddress, err)
3635
}
37-
38-
testMsg := "ECHO\n"
39-
fmt.Fprint(conn, testMsg)
40-
resp, err := bufio.NewReader(conn).ReadString('\n')
41-
if err == io.EOF {
42-
return false, nil // The server sent an empty response
43-
}
44-
if err != nil {
45-
return false, fmt.Errorf("error reading from TCP socket: %v", err)
46-
}
47-
if resp != testMsg {
48-
return false, nil
49-
}
36+
conn.Close()
5037
return true, nil
5138
}
5239

53-
// TCPPortServerCheck ensures that the given port is free, and stands up a TCP server that can be used to
54-
// check TCP connectivity to the host using TCPPortClientCheck
40+
// TCPPortServerCheck ensures that the given port is free, or bound to the right
41+
// process. In the case that it is free, it stands up a TCP server that can be
42+
// used to check TCP connectivity to the host using TCPPortClientCheck
5543
type TCPPortServerCheck struct {
5644
PortNumber int
45+
ProcName string
5746
started bool
5847
closeListener func() error
5948
listenerClosed chan interface{}
6049
}
6150

62-
// Check returns true if the port is available for the server. Otherwise returns false
63-
// and an error message
51+
// Check returns true if the port is free, or taken by the expected process.
6452
func (c *TCPPortServerCheck) Check() (bool, error) {
6553
ln, err := net.Listen("tcp", fmt.Sprintf(":%d", c.PortNumber))
6654
if err != nil && strings.Contains(err.Error(), "address already in use") {
67-
return false, nil
55+
return portTakenByProc(c.PortNumber, c.ProcName)
6856
}
6957
if err != nil {
70-
// TODO: We could check if the port is being used here..
71-
return false, fmt.Errorf("error listening on port %d", c.PortNumber)
58+
return false, fmt.Errorf("error listening on port %d: %v", c.PortNumber, err)
7259
}
7360
c.closeListener = ln.Close
7461
// Setup go routine for accepting connections
@@ -98,11 +85,40 @@ func (c *TCPPortServerCheck) Check() (bool, error) {
9885
return true, nil
9986
}
10087

101-
// Close the TCP server
88+
// Close the TCP server if it was started. Otherwise this is a noop.
10289
func (c *TCPPortServerCheck) Close() error {
10390
if c.started {
10491
close(c.listenerClosed)
10592
return c.closeListener()
10693
}
107-
return errors.New("called close on a TCPPortServerCheck that is not started")
94+
return nil
95+
}
96+
97+
// Returns true if the port is taken by a process with the given name.
98+
func portTakenByProc(port int, procName string) (bool, error) {
99+
// Use lsof to find the process that is bound to the tcp port in listen
100+
// mode.
101+
// ~# lsof -i TCP:2379 -s TCP:LISTEN -Pn +c 0
102+
// COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
103+
// docker-proxy 7294 root 4u IPv6 43407 0t0 TCP *:2379 (LISTEN)
104+
portArg := fmt.Sprintf("TCP:%d", port)
105+
cmd := exec.Command("lsof", "-i", portArg, "-s", "TCP:LISTEN", "-Pn", "+c", "0")
106+
out, err := cmd.Output()
107+
if err != nil {
108+
return false, fmt.Errorf("error running lsof: %v", err)
109+
}
110+
lines := strings.Split(string(out), "\n")
111+
if len(lines) < 2 {
112+
return false, fmt.Errorf("expected lsof to return at least 2 lines, but returned %d", len(lines))
113+
}
114+
// There are cases where lsof will return multiple lines for the same port. For example:
115+
// ~# lsof -i TCP:$port -s TCP:LISTEN -Pn +c 0
116+
// COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
117+
// nginx 18611 root 10u IPv4 82841 0t0 TCP *:80 (LISTEN)
118+
// nginx 18628 nobody 10u IPv4 82841 0t0 TCP *:80 (LISTEN)
119+
// nginx 18629 nobody 10u IPv4 82841 0t0 TCP *:80 (LISTEN)
120+
//
121+
// Use the first line after the header for verifying the proc name.
122+
lsofFields := strings.Fields(lines[1])
123+
return lsofFields[0] == procName, nil
108124
}

pkg/inspector/rule/check_mapper.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func (m DefaultCheckMapper) GetCheckForRule(rule Rule) (check.Check, error) {
3838
case FileContentMatches:
3939
c = check.FileContentCheck{File: r.File, SearchString: r.ContentRegex}
4040
case TCPPortAvailable:
41-
c = &check.TCPPortServerCheck{PortNumber: r.Port}
41+
c = &check.TCPPortServerCheck{PortNumber: r.Port, ProcName: r.ProcName}
4242
case TCPPortAccessible:
4343
timeout, err := time.ParseDuration(r.Timeout)
4444
if err != nil {

pkg/inspector/rule/encoding.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type catchAllRule struct {
4040
AnyVersion bool `yaml:"anyVersion"`
4141
Executable string `yaml:"executable"`
4242
Port int `yaml:"port"`
43+
ProcName string `yaml:"procName"`
4344
File string `yaml:"file"`
4445
ContentRegex string `yaml:"contentRegex"`
4546
Timeout string `yaml:"timeout"`
@@ -103,7 +104,8 @@ func buildRule(catchAll catchAllRule) (Rule, error) {
103104
return r, nil
104105
case "tcpportavailable":
105106
r := TCPPortAvailable{
106-
Port: catchAll.Port,
107+
Port: catchAll.Port,
108+
ProcName: catchAll.ProcName,
107109
}
108110
r.Meta = meta
109111
return r, nil

pkg/inspector/rule/rule_set.go

+22-8
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,19 @@ const defaultRuleSet = `---
3535
- kind: TCPPortAvailable
3636
when: ["etcd"]
3737
port: 2379
38+
procName: docker-proxy # docker sets up a proxy for the etcd container
3839
- kind: TCPPortAvailable
3940
when: ["etcd"]
4041
port: 6666
42+
procName: docker-proxy # docker sets up a proxy for the etcd container
4143
- kind: TCPPortAvailable
4244
when: ["etcd"]
4345
port: 2380
46+
procName: docker-proxy # docker sets up a proxy for the etcd container
4447
- kind: TCPPortAvailable
4548
when: ["etcd"]
4649
port: 6660
50+
procName: docker-proxy # docker sets up a proxy for the etcd container
4751
4852
# Ports used by etcd are accessible
4953
- kind: TCPPortAccessible
@@ -67,17 +71,21 @@ const defaultRuleSet = `---
6771
- kind: TCPPortAvailable
6872
when: ["master"]
6973
port: 6443
74+
procName: kube-apiserver
7075
- kind: TCPPortAvailable
7176
when: ["master"]
7277
port: 8080
78+
procName: kube-apiserver
7379
# kube-scheduler
7480
- kind: TCPPortAvailable
7581
when: ["master"]
7682
port: 10251
83+
procName: kube-scheduler
7784
# kube-controller-manager
7885
- kind: TCPPortAvailable
7986
when: ["master"]
8087
port: 10252
88+
procName: kube-controller
8189
8290
# Ports used by K8s master are accessible
8391
# Port 8080 is not accessible from outside
@@ -101,26 +109,32 @@ const defaultRuleSet = `---
101109
- kind: TCPPortAvailable
102110
when: ["master","worker","ingress","storage"]
103111
port: 4194
112+
procName: kubelet
104113
# kubelet localhost healthz
105114
- kind: TCPPortAvailable
106115
when: ["master","worker","ingress","storage"]
107116
port: 10248
117+
procName: kubelet
108118
# kube-proxy metrics
109119
- kind: TCPPortAvailable
110120
when: ["master","worker","ingress","storage"]
111121
port: 10249
122+
procName: kube-proxy
112123
# kube-proxy health
113124
- kind: TCPPortAvailable
114125
when: ["master","worker","ingress","storage"]
115126
port: 10256
127+
procName: kube-proxy
116128
# kubelet
117129
- kind: TCPPortAvailable
118130
when: ["master","worker","ingress","storage"]
119131
port: 10250
132+
procName: kubelet
120133
# kubelet no auth
121134
- kind: TCPPortAvailable
122135
when: ["master","worker","ingress","storage"]
123136
port: 10255
137+
procName: kubelet
124138
125139
# Ports used by K8s worker are accessible
126140
# cAdvisor
@@ -143,13 +157,15 @@ const defaultRuleSet = `---
143157
- kind: TCPPortAvailable
144158
when: ["ingress"]
145159
port: 80
160+
procName: nginx
146161
- kind: TCPPortAccessible
147162
when: ["ingress"]
148163
port: 80
149164
timeout: 5s
150165
- kind: TCPPortAvailable
151166
when: ["ingress"]
152167
port: 443
168+
procName: nginx
153169
- kind: TCPPortAccessible
154170
when: ["ingress"]
155171
port: 443
@@ -158,6 +174,7 @@ const defaultRuleSet = `---
158174
- kind: TCPPortAvailable
159175
when: ["ingress"]
160176
port: 10254
177+
procName: nginx-ingress-c
161178
- kind: TCPPortAccessible
162179
when: ["ingress"]
163180
port: 10254
@@ -392,44 +409,41 @@ const defaultRuleSet = `---
392409
- kind: TCPPortAvailable
393410
when: ["storage"]
394411
port: 8081
412+
procName: exechealthz
395413
- kind: TCPPortAccessible
396414
when: ["storage"]
397415
port: 8081
398416
timeout: 5s
399417
400418
# Ports required for NFS
401-
# Removed due to https://github.com/apprenda/kismatic/issues/784
402-
#- kind: TCPPortAvailable
403-
# when: ["storage"]
404-
# port: 111
405-
#- kind: TCPPortAccessible
406-
# when: ["storage"]
407-
# port: 111
408-
# timeout: 5s
409419
- kind: TCPPortAvailable
410420
when: ["storage"]
411421
port: 2049
422+
procName: glusterfs
412423
- kind: TCPPortAccessible
413424
when: ["storage"]
414425
port: 2049
415426
timeout: 5s
416427
- kind: TCPPortAvailable
417428
when: ["storage"]
418429
port: 38465
430+
procName: glusterfs
419431
- kind: TCPPortAccessible
420432
when: ["storage"]
421433
port: 38465
422434
timeout: 5s
423435
- kind: TCPPortAvailable
424436
when: ["storage"]
425437
port: 38466
438+
procName: glusterfs
426439
- kind: TCPPortAccessible
427440
when: ["storage"]
428441
port: 38466
429442
timeout: 5s
430443
- kind: TCPPortAvailable
431444
when: ["storage"]
432445
port: 38467
446+
procName: glusterfs
433447
- kind: TCPPortAccessible
434448
when: ["storage"]
435449
port: 38467

pkg/inspector/rule/tcp_port.go

+12-4
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,15 @@ import (
77
)
88

99
// TCPPortAvailable is a rule that ensures that a given port is available
10-
// on the node. Available means that the port is not being used by another
11-
// process.
10+
// on the node. The port is considered available if:
11+
// - The port is free and ready to be bound by a new process, or
12+
// - The port is bound to the process defined in ProcName
1213
type TCPPortAvailable struct {
1314
Meta
15+
// The port number to verify
1416
Port int
17+
// The name of the process that owns this port after KET installation
18+
ProcName string
1519
}
1620

1721
// Name is the name of the rule
@@ -24,10 +28,14 @@ func (p TCPPortAvailable) IsRemoteRule() bool { return false }
2428

2529
// Validate the rule
2630
func (p TCPPortAvailable) Validate() []error {
31+
var errs []error
2732
if p.Port < 1 || p.Port > 65535 {
28-
return []error{fmt.Errorf("Invalid port number %d specified", p.Port)}
33+
errs = append(errs, fmt.Errorf("Invalid port number %d specified", p.Port))
2934
}
30-
return nil
35+
if p.ProcName == "" {
36+
errs = append(errs, fmt.Errorf("ProcName cannot be empty"))
37+
}
38+
return errs
3139
}
3240

3341
// TCPPortAccessible is a rule that ensures the given port on a remote node

0 commit comments

Comments
 (0)