diff --git a/Package.swift b/Package.swift index f3f0857..756ae71 100644 --- a/Package.swift +++ b/Package.swift @@ -8,10 +8,9 @@ let package = Package( platforms: [.macOS(.v14), .iOS(.v17), .tvOS(.v17)], products: [ .library(name: "HummingbirdRedis", targets: ["HummingbirdRedis"]), - .library(name: "HummingbirdJobsRedis", targets: ["HummingbirdJobsRedis"]), ], dependencies: [ - .package(url: "https://github.com/hummingbird-project/hummingbird.git", from: "2.0.0-beta.1"), + .package(url: "https://github.com/hummingbird-project/hummingbird.git", from: "2.0.0-rc.1"), .package(url: "https://github.com/swift-server/RediStack.git", from: "1.4.0"), ], targets: [ @@ -19,22 +18,10 @@ let package = Package( .product(name: "Hummingbird", package: "hummingbird"), .product(name: "RediStack", package: "RediStack"), ]), - .target(name: "HummingbirdJobsRedis", dependencies: [ - .byName(name: "HummingbirdRedis"), - .product(name: "Hummingbird", package: "hummingbird"), - .product(name: "HummingbirdJobs", package: "hummingbird"), - .product(name: "RediStack", package: "RediStack"), - ]), .testTarget(name: "HummingbirdRedisTests", dependencies: [ .byName(name: "HummingbirdRedis"), .product(name: "Hummingbird", package: "hummingbird"), .product(name: "HummingbirdTesting", package: "hummingbird"), ]), - .testTarget(name: "HummingbirdJobsRedisTests", dependencies: [ - .byName(name: "HummingbirdJobsRedis"), - .product(name: "Hummingbird", package: "hummingbird"), - .product(name: "HummingbirdJobs", package: "hummingbird"), - .product(name: "HummingbirdTesting", package: "hummingbird"), - ]), ] ) diff --git a/Sources/HummingbirdJobsRedis/Configuration.swift b/Sources/HummingbirdJobsRedis/Configuration.swift deleted file mode 100644 index 9fa7ae3..0000000 --- a/Sources/HummingbirdJobsRedis/Configuration.swift +++ /dev/null @@ -1,52 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Hummingbird server framework project -// -// Copyright (c) 2021-2021 the Hummingbird authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import NIOCore -@preconcurrency import RediStack - -extension RedisQueue { - /// what to do with failed/processing jobs from last time queue was handled - public enum JobInitialization: Sendable { - case doNothing - case rerun - case remove - } - - /// Redis Job queue configuration - public struct Configuration: Sendable { - let queueKey: RedisKey - let processingQueueKey: RedisKey - let failedQueueKey: RedisKey - let pendingJobInitialization: JobInitialization - let processingJobsInitialization: JobInitialization - let failedJobsInitialization: JobInitialization - let pollTime: Duration - - public init( - queueKey: String = "_hbJobQueue", - pollTime: Duration = .milliseconds(100), - pendingJobInitialization: JobInitialization = .doNothing, - processingJobsInitialization: JobInitialization = .rerun, - failedJobsInitialization: JobInitialization = .doNothing - ) { - self.queueKey = RedisKey(queueKey) - self.processingQueueKey = RedisKey("\(queueKey)Processing") - self.failedQueueKey = RedisKey("\(queueKey)Failed") - self.pollTime = pollTime - self.pendingJobInitialization = pendingJobInitialization - self.processingJobsInitialization = processingJobsInitialization - self.failedJobsInitialization = failedJobsInitialization - } - } -} diff --git a/Sources/HummingbirdJobsRedis/RedisJobQueue.swift b/Sources/HummingbirdJobsRedis/RedisJobQueue.swift deleted file mode 100644 index 4fad1d5..0000000 --- a/Sources/HummingbirdJobsRedis/RedisJobQueue.swift +++ /dev/null @@ -1,253 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Hummingbird server framework project -// -// Copyright (c) 2021-2021 the Hummingbird authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Atomics -import struct Foundation.Data -import class Foundation.JSONDecoder -import struct Foundation.UUID -import Hummingbird -import HummingbirdJobs -import HummingbirdRedis -import NIOCore -import RediStack - -/// Redis implementation of job queue driver -public final class RedisQueue: JobQueueDriver { - public struct JobID: Sendable, CustomStringConvertible { - let id: String - - public init() { - self.id = UUID().uuidString - } - - /// Initialize JobID from String - /// - Parameter value: string value - public init(_ value: String) { - self.id = value - } - - public init(from decoder: Decoder) throws { - let container = try decoder.singleValueContainer() - self.id = try container.decode(String.self) - } - - public func encode(to encoder: Encoder) throws { - var container = encoder.singleValueContainer() - try container.encode(self.id) - } - - var redisKey: RedisKey { .init(self.description) } - - /// String description of Identifier - public var description: String { self.id } - } - - public enum RedisQueueError: Error, CustomStringConvertible { - case unexpectedRedisKeyType - case jobMissing(JobID) - - public var description: String { - switch self { - case .unexpectedRedisKeyType: - return "Unexpected redis key type" - case .jobMissing(let value): - return "Job associated with \(value) is missing" - } - } - } - - let redisConnectionPool: RedisConnectionPoolService - let configuration: Configuration - let isStopped: ManagedAtomic - - /// Initialize redis job queue - /// - Parameters: - /// - redisConnectionPoolService: Redis connection pool - /// - configuration: configuration - public init(_ redisConnectionPoolService: RedisConnectionPoolService, configuration: Configuration = .init()) { - self.redisConnectionPool = redisConnectionPoolService - self.configuration = configuration - self.isStopped = .init(false) - } - - /// This is run at initialization time. - /// - /// Will push all the jobs in the processing queue back onto to the main queue so they can - /// be rerun - public func onInit() async throws { - try await self.initQueue(queueKey: self.configuration.queueKey, onInit: self.configuration.pendingJobInitialization) - // there shouldn't be any on the processing list, but if there are we should do something with them - try await self.initQueue(queueKey: self.configuration.processingQueueKey, onInit: self.configuration.processingJobsInitialization) - try await self.initQueue(queueKey: self.configuration.failedQueueKey, onInit: self.configuration.failedJobsInitialization) - } - - /// Push job data onto queue - /// - Parameters: - /// - data: Job data - /// - Returns: Queued job - @discardableResult public func push(_ buffer: ByteBuffer) async throws -> JobID { - let jobInstanceID = JobID() - try await self.set(jobId: jobInstanceID, buffer: buffer) - _ = try await self.redisConnectionPool.lpush(jobInstanceID.redisKey, into: self.configuration.queueKey).get() - return jobInstanceID - } - - /// Flag job is done - /// - /// Removes job id from processing queue - /// - Parameters: - /// - jobId: Job id - public func finished(jobId: JobID) async throws { - _ = try await self.redisConnectionPool.lrem(jobId.description, from: self.configuration.processingQueueKey, count: 0).get() - try await self.delete(jobId: jobId) - } - - /// Flag job failed to process - /// - /// Removes job id from processing queue, adds to failed queue - /// - Parameters: - /// - jobId: Job id - public func failed(jobId: JobID, error: Error) async throws { - _ = try await self.redisConnectionPool.lrem(jobId.redisKey, from: self.configuration.processingQueueKey, count: 0).get() - _ = try await self.redisConnectionPool.lpush(jobId.redisKey, into: self.configuration.failedQueueKey).get() - } - - public func stop() async { - self.isStopped.store(true, ordering: .relaxed) - } - - public func shutdownGracefully() async {} - - /// Pop Job off queue and add to pending queue - /// - Parameter eventLoop: eventLoop to do work on - /// - Returns: queued job - func popFirst() async throws -> QueuedJob? { - let pool = self.redisConnectionPool.pool - let key = try await pool.rpoplpush(from: self.configuration.queueKey, to: self.configuration.processingQueueKey).get() - guard !key.isNull else { - return nil - } - guard let key = String(fromRESP: key) else { - throw RedisQueueError.unexpectedRedisKeyType - } - let identifier = JobID(key) - if let buffer = try await self.get(jobId: identifier) { - return .init(id: identifier, jobBuffer: buffer) - } else { - throw RedisQueueError.jobMissing(identifier) - } - } - - /// What to do with queue at initialization - func initQueue(queueKey: RedisKey, onInit: JobInitialization) async throws { - switch onInit { - case .remove: - try await self.remove(queueKey: queueKey) - case .rerun: - try await self.rerun(queueKey: queueKey) - case .doNothing: - break - } - } - - /// Push all the entries from list back onto the main list. - func rerun(queueKey: RedisKey) async throws { - while true { - let key = try await self.redisConnectionPool.rpoplpush(from: queueKey, to: self.configuration.queueKey).get() - if key.isNull { - return - } - } - } - - /// Push all the entries from list back onto the main list. - func remove(queueKey: RedisKey) async throws { - while true { - let key = try await self.redisConnectionPool.rpop(from: queueKey).get() - if key.isNull { - break - } - guard let key = String(fromRESP: key) else { - throw RedisQueueError.unexpectedRedisKeyType - } - let identifier = JobID(key) - try await self.delete(jobId: identifier) - } - } - - func get(jobId: JobID) async throws -> ByteBuffer? { - return try await self.redisConnectionPool.get(jobId.redisKey).get().byteBuffer - } - - func set(jobId: JobID, buffer: ByteBuffer) async throws { - return try await self.redisConnectionPool.set(jobId.redisKey, to: buffer).get() - } - - func delete(jobId: JobID) async throws { - _ = try await self.redisConnectionPool.delete(jobId.redisKey).get() - } -} - -/// extend RedisJobQueue to conform to AsyncSequence -extension RedisQueue { - public typealias Element = QueuedJob - public struct AsyncIterator: AsyncIteratorProtocol { - let queue: RedisQueue - - public func next() async throws -> Element? { - while true { - if self.queue.isStopped.load(ordering: .relaxed) { - return nil - } - if let job = try await queue.popFirst() { - return job - } - // we only sleep if we didn't receive a job - try await Task.sleep(for: self.queue.configuration.pollTime) - } - } - } - - public func makeAsyncIterator() -> AsyncIterator { - return .init(queue: self) - } -} - -extension JobQueueDriver where Self == RedisQueue { - /// Return Redis driver for Job Queue - /// - Parameters: - /// - redisConnectionPoolService: Redis connection pool - /// - configuration: configuration - public static func redis(_ redisConnectionPoolService: RedisConnectionPoolService, configuration: RedisQueue.Configuration = .init()) -> Self { - .init(redisConnectionPoolService, configuration: configuration) - } -} - -// Extend ByteBuffer so that is conforms to `RESPValueConvertible`. Really not sure why -// this isnt available already -#if hasFeature(RetroactiveAttribute) -extension ByteBuffer: @retroactive RESPValueConvertible {} -#else -extension ByteBuffer: RESPValueConvertible {} -#endif -extension ByteBuffer { - public init?(fromRESP value: RESPValue) { - guard let buffer = value.byteBuffer else { return nil } - self = buffer - } - - public func convertedToRESPValue() -> RESPValue { - return .bulkString(self) - } -} diff --git a/Tests/HummingbirdJobsRedisTests/RedisJobsTests.swift b/Tests/HummingbirdJobsRedisTests/RedisJobsTests.swift deleted file mode 100644 index d40ae77..0000000 --- a/Tests/HummingbirdJobsRedisTests/RedisJobsTests.swift +++ /dev/null @@ -1,328 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Hummingbird server framework project -// -// Copyright (c) 2021-2021 the Hummingbird authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See hummingbird/CONTRIBUTORS.txt for the list of Hummingbird authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import Atomics -import Hummingbird -import HummingbirdJobs -@testable import HummingbirdJobsRedis -import HummingbirdRedis -import HummingbirdTesting -import Logging -import NIOConcurrencyHelpers -import RediStack -import ServiceLifecycle -import XCTest - -extension XCTestExpectation { - convenience init(description: String, expectedFulfillmentCount: Int) { - self.init(description: description) - self.expectedFulfillmentCount = expectedFulfillmentCount - } -} - -final class HummingbirdRedisJobsTests: XCTestCase { - func wait(for expectations: [XCTestExpectation], timeout: TimeInterval) async { - #if (os(Linux) && swift(<5.9)) || swift(<5.8) - super.wait(for: expectations, timeout: timeout) - #else - await fulfillment(of: expectations, timeout: timeout) - #endif - } - - static let env = Environment() - static let redisHostname = env.get("REDIS_HOSTNAME") ?? "localhost" - - /// Helper function for test a server - /// - /// Creates test client, runs test function abd ensures everything is - /// shutdown correctly - @discardableResult public func testJobQueue( - numWorkers: Int, - failedJobsInitialization: RedisQueue.JobInitialization = .remove, - test: (JobQueue) async throws -> T - ) async throws -> T { - var logger = Logger(label: "RedisJobsTests") - logger.logLevel = .debug - let redisService = try RedisConnectionPoolService( - .init(hostname: Self.redisHostname, port: 6379), - logger: logger - ) - let jobQueue = JobQueue( - .redis( - redisService, - configuration: .init( - pendingJobInitialization: .remove, - processingJobsInitialization: .remove, - failedJobsInitialization: failedJobsInitialization - ) - ), - numWorkers: numWorkers, - logger: logger - ) - - return try await withThrowingTaskGroup(of: Void.self) { group in - let serviceGroup = ServiceGroup( - configuration: .init( - services: [redisService, jobQueue], - gracefulShutdownSignals: [.sigterm, .sigint], - logger: Logger(label: "JobQueueService") - ) - ) - group.addTask { - try await serviceGroup.run() - } - let value = try await test(jobQueue) - await serviceGroup.triggerGracefulShutdown() - return value - } - } - - func testBasic() async throws { - let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) - let jobIdentifer = JobIdentifier(#function) - try await self.testJobQueue(numWorkers: 1) { jobQueue in - jobQueue.registerJob(id: jobIdentifer) { parameters, context in - context.logger.info("Parameters=\(parameters)") - try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) - expectation.fulfill() - } - try await jobQueue.push(id: jobIdentifer, parameters: 1) - try await jobQueue.push(id: jobIdentifer, parameters: 2) - try await jobQueue.push(id: jobIdentifer, parameters: 3) - try await jobQueue.push(id: jobIdentifer, parameters: 4) - try await jobQueue.push(id: jobIdentifer, parameters: 5) - try await jobQueue.push(id: jobIdentifer, parameters: 6) - try await jobQueue.push(id: jobIdentifer, parameters: 7) - try await jobQueue.push(id: jobIdentifer, parameters: 8) - try await jobQueue.push(id: jobIdentifer, parameters: 9) - try await jobQueue.push(id: jobIdentifer, parameters: 10) - - await self.wait(for: [expectation], timeout: 5) - } - } - - func testMultipleWorkers() async throws { - let jobIdentifer = JobIdentifier(#function) - let runningJobCounter = ManagedAtomic(0) - let maxRunningJobCounter = ManagedAtomic(0) - let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 10) - - try await self.testJobQueue(numWorkers: 4) { jobQueue in - jobQueue.registerJob(id: jobIdentifer) { parameters, context in - let runningJobs = runningJobCounter.wrappingIncrementThenLoad(by: 1, ordering: .relaxed) - if runningJobs > maxRunningJobCounter.load(ordering: .relaxed) { - maxRunningJobCounter.store(runningJobs, ordering: .relaxed) - } - try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) - context.logger.info("Parameters=\(parameters)") - expectation.fulfill() - runningJobCounter.wrappingDecrement(by: 1, ordering: .relaxed) - } - - try await jobQueue.push(id: jobIdentifer, parameters: 1) - try await jobQueue.push(id: jobIdentifer, parameters: 2) - try await jobQueue.push(id: jobIdentifer, parameters: 3) - try await jobQueue.push(id: jobIdentifer, parameters: 4) - try await jobQueue.push(id: jobIdentifer, parameters: 5) - try await jobQueue.push(id: jobIdentifer, parameters: 6) - try await jobQueue.push(id: jobIdentifer, parameters: 7) - try await jobQueue.push(id: jobIdentifer, parameters: 8) - try await jobQueue.push(id: jobIdentifer, parameters: 9) - try await jobQueue.push(id: jobIdentifer, parameters: 10) - - await self.wait(for: [expectation], timeout: 5) - - XCTAssertGreaterThan(maxRunningJobCounter.load(ordering: .relaxed), 1) - XCTAssertLessThanOrEqual(maxRunningJobCounter.load(ordering: .relaxed), 4) - } - } - - func testErrorRetryCount() async throws { - let jobIdentifer = JobIdentifier(#function) - let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 4) - struct FailedError: Error {} - try await self.testJobQueue(numWorkers: 1) { jobQueue in - jobQueue.registerJob(id: jobIdentifer, maxRetryCount: 3) { _, _ in - expectation.fulfill() - throw FailedError() - } - try await jobQueue.push(id: jobIdentifer, parameters: 0) - - await self.wait(for: [expectation], timeout: 5) - try await Task.sleep(for: .milliseconds(200)) - - let failedJobs = try await jobQueue.queue.redisConnectionPool.llen(of: jobQueue.queue.configuration.failedQueueKey).get() - XCTAssertEqual(failedJobs, 1) - - let pendingJobs = try await jobQueue.queue.redisConnectionPool.llen(of: jobQueue.queue.configuration.queueKey).get() - XCTAssertEqual(pendingJobs, 0) - } - } - - func testJobSerialization() async throws { - struct TestJobParameters: Codable { - let id: Int - let message: String - } - let expectation = XCTestExpectation(description: "TestJob.execute was called") - let jobIdentifer = JobIdentifier(#function) - try await self.testJobQueue(numWorkers: 1) { jobQueue in - jobQueue.registerJob(id: jobIdentifer) { parameters, _ in - XCTAssertEqual(parameters.id, 23) - XCTAssertEqual(parameters.message, "Hello!") - expectation.fulfill() - } - try await jobQueue.push(id: jobIdentifer, parameters: .init(id: 23, message: "Hello!")) - - await self.wait(for: [expectation], timeout: 5) - } - } - - /// Test job is cancelled on shutdown - func testShutdownJob() async throws { - let jobIdentifer = JobIdentifier(#function) - let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 1) - var logger = Logger(label: "HummingbirdJobsTests") - logger.logLevel = .trace - - try await self.testJobQueue(numWorkers: 4) { jobQueue in - jobQueue.registerJob(id: jobIdentifer) { _, _ in - expectation.fulfill() - try await Task.sleep(for: .milliseconds(1000)) - } - try await jobQueue.push(id: jobIdentifer, parameters: 0) - await self.wait(for: [expectation], timeout: 5) - - let pendingJobs = try await jobQueue.queue.redisConnectionPool.llen(of: jobQueue.queue.configuration.queueKey).get() - XCTAssertEqual(pendingJobs, 0) - let failedJobs = try await jobQueue.queue.redisConnectionPool.llen(of: jobQueue.queue.configuration.failedQueueKey).get() - let processingJobs = try await jobQueue.queue.redisConnectionPool.llen(of: jobQueue.queue.configuration.processingQueueKey).get() - XCTAssertEqual(failedJobs + processingJobs, 1) - } - } - - /// test job fails to decode but queue continues to process - func testFailToDecode() async throws { - let string: NIOLockedValueBox = .init("") - let jobIdentifer1 = JobIdentifier(#function) - let jobIdentifer2 = JobIdentifier(#function) - let expectation = XCTestExpectation(description: "job was called", expectedFulfillmentCount: 1) - - try await self.testJobQueue(numWorkers: 4) { jobQueue in - jobQueue.registerJob(id: jobIdentifer2) { parameters, _ in - string.withLockedValue { $0 = parameters } - expectation.fulfill() - } - try await jobQueue.push(id: jobIdentifer1, parameters: 2) - try await jobQueue.push(id: jobIdentifer2, parameters: "test") - await self.wait(for: [expectation], timeout: 5) - } - string.withLockedValue { - XCTAssertEqual($0, "test") - } - } - - /// creates job that errors on first attempt, and is left on processing queue and - /// is then rerun on startup of new server - func testRerunAtStartup() async throws { - struct RetryError: Error {} - let jobIdentifer = JobIdentifier(#function) - let firstTime = ManagedAtomic(true) - let finished = ManagedAtomic(false) - let failedExpectation = XCTestExpectation(description: "TestJob failed", expectedFulfillmentCount: 1) - let succeededExpectation = XCTestExpectation(description: "TestJob2 succeeded", expectedFulfillmentCount: 1) - let job = JobDefinition(id: jobIdentifer) { _, _ in - if firstTime.compareExchange(expected: true, desired: false, ordering: .relaxed).original { - failedExpectation.fulfill() - throw RetryError() - } - succeededExpectation.fulfill() - finished.store(true, ordering: .relaxed) - } - try await self.testJobQueue(numWorkers: 4) { jobQueue in - jobQueue.registerJob(job) - - try await jobQueue.push(id: jobIdentifer, parameters: 0) - - await self.wait(for: [failedExpectation], timeout: 10) - - // stall to give job chance to start running - try await Task.sleep(for: .milliseconds(50)) - - XCTAssertFalse(firstTime.load(ordering: .relaxed)) - XCTAssertFalse(finished.load(ordering: .relaxed)) - } - - try await self.testJobQueue(numWorkers: 4, failedJobsInitialization: .rerun) { jobQueue in - jobQueue.registerJob(job) - await self.wait(for: [succeededExpectation], timeout: 10) - XCTAssertTrue(finished.load(ordering: .relaxed)) - } - } - - func testMultipleJobQueueHandlers() async throws { - let jobIdentifer = JobIdentifier(#function) - let expectation = XCTestExpectation(description: "TestJob.execute was called", expectedFulfillmentCount: 200) - let logger = { - var logger = Logger(label: "HummingbirdJobsTests") - logger.logLevel = .debug - return logger - }() - let job = JobDefinition(id: jobIdentifer) { parameters, context in - context.logger.info("Parameters=\(parameters)") - try await Task.sleep(for: .milliseconds(Int.random(in: 10..<50))) - expectation.fulfill() - } - let redisService = try RedisConnectionPoolService( - .init(hostname: Self.redisHostname, port: 6379), - logger: Logger(label: "Redis") - ) - let jobQueue = JobQueue( - RedisQueue(redisService), - numWorkers: 2, - logger: logger - ) - jobQueue.registerJob(job) - let jobQueue2 = JobQueue( - RedisQueue(redisService), - numWorkers: 2, - logger: logger - ) - jobQueue2.registerJob(job) - - try await withThrowingTaskGroup(of: Void.self) { group in - let serviceGroup = ServiceGroup( - configuration: .init( - services: [jobQueue, jobQueue2], - gracefulShutdownSignals: [.sigterm, .sigint], - logger: logger - ) - ) - group.addTask { - try await serviceGroup.run() - } - do { - for i in 0..<200 { - try await jobQueue.push(id: jobIdentifer, parameters: i) - } - await self.wait(for: [expectation], timeout: 5) - await serviceGroup.triggerGracefulShutdown() - } catch { - XCTFail("\(String(reflecting: error))") - await serviceGroup.triggerGracefulShutdown() - throw error - } - } - } -} diff --git a/Tests/HummingbirdRedisTests/PersistTests.swift b/Tests/HummingbirdRedisTests/PersistTests.swift index deed7e1..0ead119 100644 --- a/Tests/HummingbirdRedisTests/PersistTests.swift +++ b/Tests/HummingbirdRedisTests/PersistTests.swift @@ -20,7 +20,7 @@ import RediStack import XCTest final class PersistTests: XCTestCase { - static let redisHostname = Environment.shared.get("REDIS_HOSTNAME") ?? "localhost" + static let redisHostname = Environment().get("REDIS_HOSTNAME") ?? "localhost" func createApplication(_ updateRouter: (Router, PersistDriver) -> Void = { _, _ in }) throws -> some ApplicationProtocol { let router = Router()