Skip to content

Commit 926253e

Browse files
committed
refactor: implement volume mount controller
Fixes #9602 Aggregate incoming volume mount requests, reconcile them with volume status, perform actual mounting, and produce mount status. Signed-off-by: Andrey Smirnov <[email protected]>
1 parent 2b5bd5d commit 926253e

File tree

22 files changed

+3041
-520
lines changed

22 files changed

+3041
-520
lines changed

api/resource/definitions/block/block.proto

+31
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,27 @@ message LocatorSpec {
111111
google.api.expr.v1alpha1.CheckedExpr match = 1;
112112
}
113113

114+
// MountRequestSpec is the spec for MountRequest.
115+
message MountRequestSpec {
116+
string volume_id = 1;
117+
string parent_mount_id = 2;
118+
repeated string requesters = 3;
119+
repeated string requester_i_ds = 4;
120+
}
121+
114122
// MountSpec is the spec for volume mount.
115123
message MountSpec {
116124
string target_path = 1;
117125
string selinux_label = 2;
126+
repeated string options = 3;
127+
}
128+
129+
// MountStatusSpec is the spec for MountStatus.
130+
message MountStatusSpec {
131+
MountRequestSpec spec = 1;
132+
string target = 2;
133+
string source = 3;
134+
talos.resource.definitions.enums.BlockFilesystemType filesystem = 4;
118135
}
119136

120137
// PartitionSpec is the spec for volume partitioning.
@@ -160,6 +177,19 @@ message VolumeConfigSpec {
160177
EncryptionSpec encryption = 6;
161178
}
162179

180+
// VolumeMountRequestSpec is the spec for VolumeMountRequest.
181+
message VolumeMountRequestSpec {
182+
string volume_id = 1;
183+
string requester = 2;
184+
}
185+
186+
// VolumeMountStatusSpec is the spec for VolumeMountStatus.
187+
message VolumeMountStatusSpec {
188+
string volume_id = 1;
189+
string requester = 2;
190+
string target = 3;
191+
}
192+
163193
// VolumeStatusSpec is the spec for VolumeStatus resource.
164194
message VolumeStatusSpec {
165195
talos.resource.definitions.enums.BlockVolumePhase phase = 1;
@@ -176,5 +206,6 @@ message VolumeStatusSpec {
176206
talos.resource.definitions.enums.BlockEncryptionProviderType encryption_provider = 12;
177207
string pretty_size = 13;
178208
repeated string encryption_failed_syncs = 14;
209+
MountSpec mount_spec = 15;
179210
}
180211

internal/app/machined/pkg/controllers/block/internal/volumes/locate.go

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
//
2525
//nolint:gocyclo,cyclop
2626
func LocateAndProvision(ctx context.Context, logger *zap.Logger, volumeContext ManagerContext) error {
27+
volumeContext.Status.MountSpec = volumeContext.Cfg.TypedSpec().Mount
2728
volumeType := volumeContext.Cfg.TypedSpec().Type
2829

2930
if volumeType == block.VolumeTypeTmpfs {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
// This Source Code Form is subject to the terms of the Mozilla Public
2+
// License, v. 2.0. If a copy of the MPL was not distributed with this
3+
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4+
5+
package block
6+
7+
import (
8+
"context"
9+
"fmt"
10+
11+
"github.com/cosi-project/runtime/pkg/controller"
12+
"github.com/cosi-project/runtime/pkg/resource"
13+
"github.com/cosi-project/runtime/pkg/safe"
14+
"github.com/cosi-project/runtime/pkg/state"
15+
"github.com/siderolabs/gen/xslices"
16+
"go.uber.org/zap"
17+
18+
"github.com/siderolabs/talos/internal/pkg/mount/v2"
19+
"github.com/siderolabs/talos/pkg/machinery/resources/block"
20+
)
21+
22+
// MountController performs actual mount/unmount operations based on the MountRequests.
23+
type MountController struct {
24+
activeMounts map[string]func() error
25+
}
26+
27+
// Name implements controller.Controller interface.
28+
func (ctrl *MountController) Name() string {
29+
return "block.MountController"
30+
}
31+
32+
// Inputs implements controller.Controller interface.
33+
func (ctrl *MountController) Inputs() []controller.Input {
34+
return []controller.Input{
35+
{
36+
Namespace: block.NamespaceName,
37+
Type: block.MountRequestType,
38+
Kind: controller.InputStrong,
39+
},
40+
{
41+
Namespace: block.NamespaceName,
42+
Type: block.VolumeStatusType,
43+
Kind: controller.InputStrong,
44+
},
45+
{
46+
Namespace: block.NamespaceName,
47+
Type: block.MountStatusType,
48+
Kind: controller.InputDestroyReady,
49+
},
50+
}
51+
}
52+
53+
// Outputs implements controller.Controller interface.
54+
func (ctrl *MountController) Outputs() []controller.Output {
55+
return []controller.Output{
56+
{
57+
Type: block.MountStatusType,
58+
Kind: controller.OutputExclusive,
59+
},
60+
}
61+
}
62+
63+
// Run implements controller.Controller interface.
64+
//
65+
//nolint:gocyclo,cyclop
66+
func (ctrl *MountController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
67+
if ctrl.activeMounts == nil {
68+
ctrl.activeMounts = map[string]func() error{}
69+
}
70+
71+
for {
72+
select {
73+
case <-r.EventCh():
74+
case <-ctx.Done():
75+
return nil
76+
}
77+
78+
volumeStatuses, err := safe.ReaderListAll[*block.VolumeStatus](ctx, r)
79+
if err != nil {
80+
return fmt.Errorf("failed to read volume statuses: %w", err)
81+
}
82+
83+
volumeStatusMap := xslices.ToMap(
84+
safe.ToSlice(
85+
volumeStatuses,
86+
identity,
87+
),
88+
func(v *block.VolumeStatus) (string, *block.VolumeStatus) {
89+
return v.Metadata().ID(), v
90+
},
91+
)
92+
93+
mountStatuses, err := safe.ReaderListAll[*block.MountStatus](ctx, r)
94+
if err != nil {
95+
return fmt.Errorf("failed to read mount statuses: %w", err)
96+
}
97+
98+
mountStatusMap := xslices.ToMap(
99+
safe.ToSlice(
100+
mountStatuses,
101+
identity,
102+
),
103+
func(v *block.MountStatus) (string, *block.MountStatus) {
104+
return v.Metadata().ID(), v
105+
},
106+
)
107+
108+
mountRequests, err := safe.ReaderListAll[*block.MountRequest](ctx, r)
109+
if err != nil {
110+
return fmt.Errorf("failed to read mount requests: %w", err)
111+
}
112+
113+
for mountRequest := range mountRequests.All() {
114+
volumeStatus := volumeStatusMap[mountRequest.TypedSpec().VolumeID]
115+
volumeNotReady := volumeStatus == nil || volumeStatus.TypedSpec().Phase != block.VolumePhaseReady || volumeStatus.Metadata().Phase() != resource.PhaseRunning
116+
117+
mountRequestTearingDown := mountRequest.Metadata().Phase() == resource.PhaseTearingDown
118+
119+
mountStatus := mountStatusMap[mountRequest.Metadata().ID()]
120+
mountStatusTearingDown := mountStatus != nil && mountStatus.Metadata().Phase() == resource.PhaseTearingDown
121+
122+
if volumeNotReady || mountRequestTearingDown || mountStatusTearingDown {
123+
// we should tear down the mount in the following sequence:
124+
// 1. tear down & destroy MountStatus
125+
// 2. perform actual unmount
126+
// 3. remove finalizer from VolumeStatus
127+
// 4. remove finalizer from MountRequest
128+
mountStatusTornDown, err := ctrl.tearDownMountStatus(ctx, r, logger, mountRequest)
129+
if err != nil {
130+
return fmt.Errorf("error tearing down mount status %q: %w", mountRequest.Metadata().ID(), err)
131+
}
132+
133+
if !mountStatusTornDown {
134+
continue
135+
}
136+
137+
if unmounter, ok := ctrl.activeMounts[mountRequest.Metadata().ID()]; ok {
138+
if err = unmounter(); err != nil {
139+
return fmt.Errorf("failed to unmount %q: %w", mountRequest.Metadata().ID(), err)
140+
}
141+
142+
delete(ctrl.activeMounts, mountRequest.Metadata().ID())
143+
}
144+
145+
if volumeStatus != nil && volumeStatus.Metadata().Finalizers().Has(ctrl.Name()) {
146+
if err = r.RemoveFinalizer(ctx, volumeStatus.Metadata(), ctrl.Name()); err != nil {
147+
return fmt.Errorf("failed to remove finalizer from volume status %q: %w", volumeStatus.Metadata().ID(), err)
148+
}
149+
}
150+
151+
if mountRequest.Metadata().Finalizers().Has(ctrl.Name()) {
152+
if err = r.RemoveFinalizer(ctx, mountRequest.Metadata(), ctrl.Name()); err != nil {
153+
return fmt.Errorf("failed to remove finalizer from mount request %q: %w", mountRequest.Metadata().ID(), err)
154+
}
155+
}
156+
}
157+
158+
if !(volumeNotReady || mountRequestTearingDown) {
159+
// we should perform mount operation in the following sequence:
160+
// 1. add finalizer on MountRequest
161+
// 2. add finalizer on VolumeStatus
162+
// 3. perform actual mount
163+
// 4. create MountStatus
164+
if !mountRequest.Metadata().Finalizers().Has(ctrl.Name()) {
165+
if err = r.AddFinalizer(ctx, mountRequest.Metadata(), ctrl.Name()); err != nil {
166+
return fmt.Errorf("failed to add finalizer to mount request %q: %w", mountRequest.Metadata().ID(), err)
167+
}
168+
}
169+
170+
if !volumeStatus.Metadata().Finalizers().Has(ctrl.Name()) {
171+
if err = r.AddFinalizer(ctx, volumeStatus.Metadata(), ctrl.Name()); err != nil {
172+
return fmt.Errorf("failed to add finalizer to volume status %q: %w", volumeStatus.Metadata().ID(), err)
173+
}
174+
}
175+
176+
mountSource := volumeStatus.TypedSpec().MountLocation
177+
mountTarget := volumeStatus.TypedSpec().MountSpec.TargetPath
178+
mountFilesystem := volumeStatus.TypedSpec().Filesystem
179+
180+
// mount hasn't been done yet
181+
if _, ok := ctrl.activeMounts[mountRequest.Metadata().ID()]; !ok {
182+
var opts []mount.NewPointOption
183+
184+
// [TODO]: need to support more mount options:
185+
// * proj quota (static)
186+
// * ro/rw (supports re-mounting)
187+
188+
opts = append(opts, mount.WithSelinuxLabel(volumeStatus.TypedSpec().MountSpec.SelinuxLabel))
189+
190+
mountpoint := mount.NewPoint(
191+
mountSource,
192+
mountTarget,
193+
mountFilesystem.String(),
194+
opts...,
195+
)
196+
197+
unmounter, err := mountpoint.Mount(mount.WithMountPrinter(logger.Sugar().Infof))
198+
if err != nil {
199+
return fmt.Errorf("failed to mount %q: %w", mountRequest.Metadata().ID(), err)
200+
}
201+
202+
logger.Info("volume mount",
203+
zap.String("volume", volumeStatus.Metadata().ID()),
204+
zap.String("source", mountSource),
205+
zap.String("target", mountTarget),
206+
zap.Stringer("filesystem", mountFilesystem),
207+
)
208+
209+
ctrl.activeMounts[mountRequest.Metadata().ID()] = unmounter
210+
}
211+
212+
if err = safe.WriterModify(
213+
ctx, r, block.NewMountStatus(block.NamespaceName, mountRequest.Metadata().ID()),
214+
func(mountStatus *block.MountStatus) error {
215+
mountStatus.TypedSpec().Spec = *mountRequest.TypedSpec()
216+
mountStatus.TypedSpec().Source = mountSource
217+
mountStatus.TypedSpec().Target = mountTarget
218+
mountStatus.TypedSpec().Filesystem = mountFilesystem
219+
220+
return nil
221+
},
222+
); err != nil {
223+
return fmt.Errorf("failed to create mount status %q: %w", mountRequest.Metadata().ID(), err)
224+
}
225+
}
226+
}
227+
228+
r.ResetRestartBackoff()
229+
}
230+
}
231+
232+
func (ctrl *MountController) tearDownMountStatus(ctx context.Context, r controller.Runtime, logger *zap.Logger, mountRequest *block.MountRequest) (bool, error) {
233+
logger = logger.With(zap.String("mount_request", mountRequest.Metadata().ID()))
234+
235+
okToDestroy, err := r.Teardown(ctx, block.NewMountStatus(block.NamespaceName, mountRequest.Metadata().ID()).Metadata())
236+
if err != nil {
237+
if state.IsNotFoundError(err) {
238+
// no mount status, we are done
239+
return true, nil
240+
}
241+
242+
return false, fmt.Errorf("failed to teardown mount status %q: %w", mountRequest.Metadata().ID(), err)
243+
}
244+
245+
if !okToDestroy {
246+
logger.Warn("waiting for mount status to be torn down")
247+
248+
return false, nil
249+
}
250+
251+
err = r.Destroy(ctx, block.NewMountStatus(block.NamespaceName, mountRequest.Metadata().ID()).Metadata())
252+
if err != nil {
253+
return false, fmt.Errorf("failed to destroy mount status %q: %w", mountRequest.Metadata().ID(), err)
254+
}
255+
256+
logger.Info("mount status destroyed")
257+
258+
return true, nil
259+
}

0 commit comments

Comments
 (0)