Skip to content

Commit 6463de0

Browse files
chore: create & delete sync sessions over gRPC (#119)
Closes #63.
1 parent 2669a1c commit 6463de0

File tree

6 files changed

+155
-50
lines changed

6 files changed

+155
-50
lines changed

Coder-Desktop/Coder-Desktop/Preview Content/PreviewFileSync.swift

+4
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,8 @@ final class PreviewFileSync: FileSyncDaemon {
2121
func createSession(localPath _: String, agentHost _: String, remotePath _: String) async throws(DaemonError) {}
2222

2323
func deleteSessions(ids _: [String]) async throws(VPNLib.DaemonError) {}
24+
25+
func pauseSessions(ids _: [String]) async throws(VPNLib.DaemonError) {}
26+
27+
func resumeSessions(ids _: [String]) async throws(VPNLib.DaemonError) {}
2428
}

Coder-Desktop/Coder-Desktop/Views/FileSync/FileSyncConfig.swift

+16-2
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,15 @@ struct FileSyncConfig<VPN: VPNService, FS: FileSyncDaemon>: View {
5151
loading = true
5252
defer { loading = false }
5353
do throws(DaemonError) {
54+
// TODO: Support selecting & deleting multiple sessions at once
5455
try await fileSync.deleteSessions(ids: [selection!])
56+
if fileSync.sessionState.isEmpty {
57+
// Last session was deleted, stop the daemon
58+
await fileSync.stop()
59+
}
5560
} catch {
5661
deleteError = error
5762
}
58-
await fileSync.refreshSessions()
5963
selection = nil
6064
}
6165
} label: {
@@ -65,7 +69,17 @@ struct FileSyncConfig<VPN: VPNService, FS: FileSyncDaemon>: View {
6569
if let selectedSession = fileSync.sessionState.first(where: { $0.id == selection }) {
6670
Divider()
6771
Button {
68-
// TODO: Pause & Unpause
72+
Task {
73+
// TODO: Support pausing & resuming multiple sessions at once
74+
loading = true
75+
defer { loading = false }
76+
switch selectedSession.status {
77+
case .paused:
78+
try await fileSync.resumeSessions(ids: [selectedSession.id])
79+
default:
80+
try await fileSync.pauseSessions(ids: [selectedSession.id])
81+
}
82+
}
6983
} label: {
7084
switch selectedSession.status {
7185
case .paused:

Coder-Desktop/Coder-Desktop/Views/FileSync/FileSyncSessionModal.swift

+1-2
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ struct FileSyncSessionModal<VPN: VPNService, FS: FileSyncDaemon>: View {
6868
}.disabled(loading)
6969
.alert("Error", isPresented: Binding(
7070
get: { createError != nil },
71-
set: { if $0 { createError = nil } }
71+
set: { if !$0 { createError = nil } }
7272
)) {} message: {
7373
Text(createError?.description ?? "An unknown error occurred.")
7474
}
@@ -83,7 +83,6 @@ struct FileSyncSessionModal<VPN: VPNService, FS: FileSyncDaemon>: View {
8383
defer { loading = false }
8484
do throws(DaemonError) {
8585
if let existingSession {
86-
// TODO: Support selecting & deleting multiple sessions at once
8786
try await fileSync.deleteSessions(ids: [existingSession.id])
8887
}
8988
try await fileSync.createSession(

Coder-Desktop/Coder-DesktopTests/Util.swift

+4
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ class MockFileSyncDaemon: FileSyncDaemon {
4848
}
4949

5050
func createSession(localPath _: String, agentHost _: String, remotePath _: String) async throws(DaemonError) {}
51+
52+
func pauseSessions(ids _: [String]) async throws(VPNLib.DaemonError) {}
53+
54+
func resumeSessions(ids _: [String]) async throws(VPNLib.DaemonError) {}
5155
}
5256

5357
extension Inspection: @unchecked Sendable, @retroactive InspectionEmissary {}

Coder-Desktop/VPNLib/FileSync/FileSyncDaemon.swift

+10-46
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ public protocol FileSyncDaemon: ObservableObject {
1515
func refreshSessions() async
1616
func createSession(localPath: String, agentHost: String, remotePath: String) async throws(DaemonError)
1717
func deleteSessions(ids: [String]) async throws(DaemonError)
18+
func pauseSessions(ids: [String]) async throws(DaemonError)
19+
func resumeSessions(ids: [String]) async throws(DaemonError)
1820
}
1921

2022
@MainActor
@@ -41,6 +43,9 @@ public class MutagenDaemon: FileSyncDaemon {
4143
private let mutagenDataDirectory: URL
4244
private let mutagenDaemonSocket: URL
4345

46+
// Managing sync sessions could take a while, especially with prompting
47+
let sessionMgmtReqTimeout: TimeAmount = .seconds(15)
48+
4449
// Non-nil when the daemon is running
4550
var client: DaemonClient?
4651
private var group: MultiThreadedEventLoopGroup?
@@ -75,6 +80,10 @@ public class MutagenDaemon: FileSyncDaemon {
7580
return
7681
}
7782
await refreshSessions()
83+
if sessionState.isEmpty {
84+
logger.info("No sync sessions found on startup, stopping daemon")
85+
await stop()
86+
}
7887
}
7988
}
8089

@@ -162,7 +171,7 @@ public class MutagenDaemon: FileSyncDaemon {
162171
// Already connected
163172
return
164173
}
165-
group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
174+
group = MultiThreadedEventLoopGroup(numberOfThreads: 2)
166175
do {
167176
channel = try GRPCChannelPool.with(
168177
target: .unixDomainSocket(mutagenDaemonSocket.path),
@@ -252,51 +261,6 @@ public class MutagenDaemon: FileSyncDaemon {
252261
logger.info("\(line, privacy: .public)")
253262
}
254263
}
255-
256-
public func refreshSessions() async {
257-
guard case .running = state else { return }
258-
// TODO: Implement
259-
}
260-
261-
public func createSession(
262-
localPath _: String,
263-
agentHost _: String,
264-
remotePath _: String
265-
) async throws(DaemonError) {
266-
if case .stopped = state {
267-
do throws(DaemonError) {
268-
try await start()
269-
} catch {
270-
state = .failed(error)
271-
throw error
272-
}
273-
}
274-
// TODO: Add session
275-
}
276-
277-
public func deleteSessions(ids _: [String]) async throws(DaemonError) {
278-
// TODO: Delete session
279-
await stopIfNoSessions()
280-
}
281-
282-
private func stopIfNoSessions() async {
283-
let sessions: Synchronization_ListResponse
284-
do {
285-
sessions = try await client!.sync.list(Synchronization_ListRequest.with { req in
286-
req.selection = .with { selection in
287-
selection.all = true
288-
}
289-
})
290-
} catch {
291-
state = .failed(.daemonStartFailure(error))
292-
return
293-
}
294-
// If there's no configured sessions, the daemon doesn't need to be running
295-
if sessions.sessionStates.isEmpty {
296-
logger.info("No sync sessions found")
297-
await stop()
298-
}
299-
}
300264
}
301265

302266
struct DaemonClient {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
import NIOCore
2+
3+
public extension MutagenDaemon {
4+
func refreshSessions() async {
5+
guard case .running = state else { return }
6+
let sessions: Synchronization_ListResponse
7+
do {
8+
sessions = try await client!.sync.list(Synchronization_ListRequest.with { req in
9+
req.selection = .with { selection in
10+
selection.all = true
11+
}
12+
})
13+
} catch {
14+
state = .failed(.grpcFailure(error))
15+
return
16+
}
17+
sessionState = sessions.sessionStates.map { FileSyncSession(state: $0) }
18+
}
19+
20+
func createSession(
21+
localPath: String,
22+
agentHost: String,
23+
remotePath: String
24+
) async throws(DaemonError) {
25+
if case .stopped = state {
26+
do throws(DaemonError) {
27+
try await start()
28+
} catch {
29+
state = .failed(error)
30+
throw error
31+
}
32+
}
33+
let (stream, promptID) = try await host()
34+
defer { stream.cancel() }
35+
let req = Synchronization_CreateRequest.with { req in
36+
req.prompter = promptID
37+
req.specification = .with { spec in
38+
spec.alpha = .with { alpha in
39+
alpha.protocol = .local
40+
alpha.path = localPath
41+
}
42+
spec.beta = .with { beta in
43+
beta.protocol = .ssh
44+
beta.host = agentHost
45+
beta.path = remotePath
46+
}
47+
// TODO: Ingest a config from somewhere
48+
spec.configuration = Synchronization_Configuration()
49+
spec.configurationAlpha = Synchronization_Configuration()
50+
spec.configurationBeta = Synchronization_Configuration()
51+
}
52+
}
53+
do {
54+
// The first creation will need to transfer the agent binary
55+
// TODO: Because this is pretty long, we should show progress updates
56+
// using the prompter messages
57+
_ = try await client!.sync.create(req, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout * 4)))
58+
} catch {
59+
throw .grpcFailure(error)
60+
}
61+
await refreshSessions()
62+
}
63+
64+
func deleteSessions(ids: [String]) async throws(DaemonError) {
65+
// Terminating sessions does not require prompting, according to the
66+
// Mutagen CLI
67+
let (stream, promptID) = try await host(allowPrompts: false)
68+
defer { stream.cancel() }
69+
guard case .running = state else { return }
70+
do {
71+
_ = try await client!.sync.terminate(Synchronization_TerminateRequest.with { req in
72+
req.prompter = promptID
73+
req.selection = .with { selection in
74+
selection.specifications = ids
75+
}
76+
}, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout)))
77+
} catch {
78+
throw .grpcFailure(error)
79+
}
80+
await refreshSessions()
81+
}
82+
83+
func pauseSessions(ids: [String]) async throws(DaemonError) {
84+
// Pausing sessions does not require prompting, according to the
85+
// Mutagen CLI
86+
let (stream, promptID) = try await host(allowPrompts: false)
87+
defer { stream.cancel() }
88+
guard case .running = state else { return }
89+
do {
90+
_ = try await client!.sync.pause(Synchronization_PauseRequest.with { req in
91+
req.prompter = promptID
92+
req.selection = .with { selection in
93+
selection.specifications = ids
94+
}
95+
}, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout)))
96+
} catch {
97+
throw .grpcFailure(error)
98+
}
99+
await refreshSessions()
100+
}
101+
102+
func resumeSessions(ids: [String]) async throws(DaemonError) {
103+
// Resuming sessions does not require prompting, according to the
104+
// Mutagen CLI
105+
let (stream, promptID) = try await host(allowPrompts: false)
106+
defer { stream.cancel() }
107+
guard case .running = state else { return }
108+
do {
109+
_ = try await client!.sync.resume(Synchronization_ResumeRequest.with { req in
110+
req.prompter = promptID
111+
req.selection = .with { selection in
112+
selection.specifications = ids
113+
}
114+
}, callOptions: .init(timeLimit: .timeout(sessionMgmtReqTimeout)))
115+
} catch {
116+
throw .grpcFailure(error)
117+
}
118+
await refreshSessions()
119+
}
120+
}

0 commit comments

Comments
 (0)