-
Notifications
You must be signed in to change notification settings - Fork 433
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Combine Interoperability: first iteration #776
Open
andersio
wants to merge
6
commits into
master
Choose a base branch
from
anders/combine-interop
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
d718b17
Convert any Signal, Producer or Property to a Combine publisher via `…
andersio 6a0a681
Convert Combine publishers to producer via `producer()`. A few utilit…
andersio 4cffe30
Update the availability annotations.
andersio d7b1a5d
Remove `Cancellable` conformances.
andersio d47129e
Add `eraseToAnyPublisher()`.
andersio dcbf572
Remove one level of indentation.
andersio File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ PlaygroundUtility.remap | |
# SwiftPM | ||
.build | ||
Packages | ||
.swiftpm | ||
|
||
# Carthage | ||
Carthage/Build | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
#if canImport(Combine) | ||
import Combine | ||
|
||
@available(macOS 10.15, iOS 13.0, tvOS 13.0, macCatalyst 13.0, watchOS 6.0, *) | ||
extension Publisher { | ||
public func producer() -> SignalProducer<Output, Failure> { | ||
return SignalProducer { observer, lifetime in | ||
lifetime += self.sink( | ||
receiveCompletion: { completion in | ||
switch completion { | ||
case let .failure(error): | ||
observer.send(error: error) | ||
case .finished: | ||
observer.sendCompleted() | ||
} | ||
}, | ||
receiveValue: observer.send(value:) | ||
) | ||
} | ||
} | ||
} | ||
#endif |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,179 @@ | ||
#if canImport(Combine) | ||
import Combine | ||
|
||
extension SignalProducerConvertible { | ||
@available(macOS 10.15, iOS 13.0, tvOS 13.0, macCatalyst 13.0, watchOS 6.0, *) | ||
public func eraseToAnyPublisher() -> AnyPublisher<Value, Error> { | ||
publisher().eraseToAnyPublisher() | ||
} | ||
|
||
@available(macOS 10.15, iOS 13.0, tvOS 13.0, macCatalyst 13.0, watchOS 6.0, *) | ||
public func publisher() -> ProducerPublisher<Value, Error> { | ||
ProducerPublisher(base: producer) | ||
} | ||
} | ||
|
||
@available(macOS 10.15, iOS 13.0, tvOS 13.0, macCatalyst 13.0, watchOS 6.0, *) | ||
public struct ProducerPublisher<Output, Failure: Swift.Error>: Publisher { | ||
public let base: SignalProducer<Output, Failure> | ||
|
||
public init(base: SignalProducer<Output, Failure>) { | ||
self.base = base | ||
} | ||
|
||
public func receive<S>(subscriber: S) where S : Subscriber, Output == S.Input, Failure == S.Failure { | ||
let subscription = ProducerSubscription(subscriber: subscriber, base: base) | ||
subscription.bootstrap() | ||
} | ||
} | ||
|
||
final class ProducerSubscription<S: Subscriber>: Combine.Subscription { | ||
typealias Output = S.Output | ||
typealias Failure = S.Failure | ||
|
||
let subscriber: S | ||
let base: SignalProducer<Output, Failure> | ||
let state: Atomic<State> | ||
|
||
init(subscriber: S, base: SignalProducer<Output, Failure>) { | ||
self.subscriber = subscriber | ||
self.base = base | ||
self.state = Atomic(State()) | ||
} | ||
|
||
func bootstrap() { | ||
subscriber.receive(subscription: self) | ||
} | ||
|
||
func request(_ incoming: Subscribers.Demand) { | ||
let response: DemandResponse = state.modify { state in | ||
guard state.hasCancelled == false else { | ||
return .noAction | ||
} | ||
|
||
guard state.hasStarted else { | ||
state.hasStarted = true | ||
state.requested = incoming | ||
return .startUpstream | ||
} | ||
|
||
state.requested = state.requested + incoming | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be rewritten |
||
let unsatified = state.requested - state.satisfied | ||
|
||
if let max = unsatified.max { | ||
let dequeueCount = Swift.min(state.buffer.count, max) | ||
state.satisfied += dequeueCount | ||
|
||
defer { state.buffer.removeFirst(dequeueCount) } | ||
return .satisfyDemand(Array(state.buffer.prefix(dequeueCount))) | ||
} else { | ||
defer { state.buffer = [] } | ||
return .satisfyDemand(state.buffer) | ||
} | ||
} | ||
|
||
switch response { | ||
case let .satisfyDemand(output): | ||
var demand: Subscribers.Demand = .none | ||
|
||
for output in output { | ||
demand += subscriber.receive(output) | ||
} | ||
|
||
if demand != .none { | ||
request(demand) | ||
} | ||
|
||
case .startUpstream: | ||
let disposable = base.start { [weak self] event in | ||
guard let self = self else { return } | ||
|
||
switch event { | ||
case let .value(output): | ||
let (shouldSendImmediately, isDemandUnlimited): (Bool, Bool) = self.state.modify { state in | ||
guard state.hasCancelled == false else { return (false, false) } | ||
|
||
let unsatified = state.requested - state.satisfied | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [typo] unsatisfied |
||
|
||
if let count = unsatified.max, count >= 1 { | ||
assert(state.buffer.count == 0) | ||
state.satisfied += 1 | ||
return (true, false) | ||
} else if unsatified == .unlimited { | ||
assert(state.buffer.isEmpty) | ||
return (true, true) | ||
} else { | ||
assert(state.requested == state.satisfied) | ||
state.buffer.append(output) | ||
return (false, false) | ||
} | ||
} | ||
|
||
if shouldSendImmediately { | ||
let demand = self.subscriber.receive(output) | ||
|
||
if isDemandUnlimited == false && demand != .none { | ||
self.request(demand) | ||
} | ||
} | ||
|
||
case .completed, .interrupted: | ||
self.cancel() | ||
self.subscriber.receive(completion: .finished) | ||
|
||
case let .failed(error): | ||
self.cancel() | ||
self.subscriber.receive(completion: .failure(error)) | ||
} | ||
} | ||
|
||
let shouldDispose: Bool = state.modify { state in | ||
guard state.hasCancelled == false else { return true } | ||
state.producerSubscription = disposable | ||
return false | ||
} | ||
|
||
if shouldDispose { | ||
disposable.dispose() | ||
} | ||
|
||
case .noAction: | ||
break | ||
} | ||
} | ||
|
||
func cancel() { | ||
let disposable = state.modify { $0.cancel() } | ||
disposable?.dispose() | ||
} | ||
|
||
struct State { | ||
var requested: Subscribers.Demand = .none | ||
var satisfied: Subscribers.Demand = .none | ||
|
||
var buffer: [Output] = [] | ||
|
||
var producerSubscription: Disposable? | ||
var hasStarted = false | ||
var hasCancelled = false | ||
|
||
init() { | ||
producerSubscription = nil | ||
hasStarted = false | ||
hasCancelled = false | ||
} | ||
|
||
mutating func cancel() -> Disposable? { | ||
hasCancelled = true | ||
defer { producerSubscription = nil } | ||
return producerSubscription | ||
} | ||
} | ||
|
||
enum DemandResponse { | ||
case startUpstream | ||
case satisfyDemand([Output]) | ||
case noAction | ||
} | ||
} | ||
#endif |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
#if canImport(Combine) | ||
import Combine | ||
|
||
extension Lifetime { | ||
@available(macOS 10.15, iOS 13.0, tvOS 13.0, macCatalyst 13.0, watchOS 6.0, *) | ||
@discardableResult | ||
public static func += <C: Cancellable>(lhs: Lifetime, rhs: C?) -> Disposable? { | ||
rhs.flatMap { lhs.observeEnded($0.cancel) } | ||
} | ||
} | ||
#endif | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been renamed to
.Input