
Saya menulis beberapa eksperimen seputar Joint, kerangka pemrograman reaktif Apple, untuk mendapatkan wawasan tentang bagaimana Joint menangani kasus-kasus edge yang memberi saya masalah dalam kerangka pemrograman reaktif lainnya. Bagaimana cara kerja langganan? Bagaimana cara menyimpan perhitungan dalam cache? Kapan penerbit dan pelanggan menerbitkan? Jadi dalam kondisi apa Combine aman untuk thread? Bisakah saya masuk kembali? Bisakah Unilever menjamin pengiriman pesanan? Bagaimana kinerja Combine dibandingkan dengan kerangka reaktif yang ada?
Terlalu panjang untuk memuat semuanya dalam satu artikel, jadi saya akan membaginya menjadi tiga bagian:
- Menerapkan kembali protokol inti Combine
- Tiga topik: operasi bersama, siklus hidup referensi bersama, dan pelanggan bersama
- Asinkron, threading, dan kinerja
Artikel ini akan menjadi sepertiga pertama penyelidikan saya, yang mencakup upaya untuk menerapkan kembali tiga protokol utama merger: Publisher
, Subscriber
Dan Subscription
.
unduh: Kode program seri ini, CombineExploration, dapat ditemukan di github.
memperingatkan: Ini bukan pengajaran gabungan. Saya tidak akan menggunakan Combine dengan cara yang menyerupai cara tradisional. Ini akan melihat beberapa kasus edge dalam penggabungan, menguji perilaku yang tidak benar-benar terdokumentasi dan oleh karena itu mungkin berubah di masa mendatang.
penerbit dan pelanggan
Kombinasi Apple dibangun berdasarkan dua protokol utama, Publisher
Dan Subscriber
.
Penafsiran kekanak-kanakan tentang “Gabungkan” itu a
Publisher
Memancarkan serangkaian nilai.
Penjelasan umum ini tidak akurat, namun perbedaan antara penjelasan “naif” dan penjelasan “akurat” ini sangat jarang sehingga kita sering mengabaikannya.
Publisher
Didefinisikan sebagai:
protocol Publisher {
associatedtype Output
associatedtype Failure : Error
func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
}
Berdasarkan perjanjian tersebut, a Publisher
Tidak mengeluarkan nilai tetapi menerimanya Subscriber
S. Tentu saja tergantung pada apa Subscriber
Ya, Publisher
Masih dimungkinkan untuk mengirim nilai secara langsung ke sana Subscriber
Itu diterima.
Jadi mari kita lihat Subscriber
Protokol untuk mendapatkan gambaran yang lebih jelas:
protocol Subscriber : CustomCombineIdentifierConvertible {
associatedtype Input
associatedtype Failure : Error
func receive(_ input: Self.Input) -> Subscribers.Demand
func receive(completion: Subscribers.Completion<Self.Failure>)
func receive(subscription: Subscription)
}
Abaikan fungsi terakhir untuk saat ini, fungsi lainnya di sini Subscriber
Tampaknya mendukung penjelasan “naif”: Publisher
menerima Subscriber
s dan dapat mengirim Input
nilai atau Completion
peristiwa diarahkan ke semua yang diketahui Subscriber
S.
Mari kita siapkan “kasus kontrol” yang dapat kita bandingkan dengan pengujian lainnya, dimulai dengan pengujian yang cukup standar Publisher
Dan Subscriber
model, di mana Subscribers.Sink
(Khas Subscriber
) berlangganan PassthroughSubject
(Tidak terlalu khas Publisher
Namun ini berguna dalam pengujian karena memungkinkan kita dengan mudah memasukkan nilai dari luar) dan mencatat nilai yang diteruskan dari subjek ke penerima.
func testSubjectSink() {
let subject = PassthroughSubject<Int, Never>()
var received = [Subscribers.Event<Int, Never>]()
let sink = Subscribers.Sink<Int, Never>(
receiveCompletion: { received.append(.complete($0)) },
receiveValue: { received.append(.value($0)) }
)
subject.subscribe(sink)
subject.send(sequence: 1...3, completion: .finished)
XCTAssertEqual(received, (1...3).asEvents(completion: .finished))
}
Tes ini mencakup beberapa hal yang saya tambahkan sendiri untuk membuat pengujian lebih mudah:
Subscribers.Event
Hanya “salah satu”Value
DanCompletion
Jenis urutan gabungansend(sequence:completion:)
Kirim semua nilai secara berurutan dan lengkapasEvents
Buat sebuah arraySubscribers.Event
dari sebuah arrayValue
dan sebuahCompletion
.
Tes ini sesuai dengan interpretasi “naif”: nilai yang dikirim ke topik diterima oleh penutupan yang kami teruskan ke penerima.
grafik berubah seiring waktu
Bayangkan sebuah tema dasar, A
yang menghasilkan nilai seiring waktu (seperti koneksi jaringan), diikuti dengan node transisi berstatus B
(Misalnya scan
atau pemroses aliran serupa) dan kemudian menjadi pengamat C
(Misalnya Sink
):
func testScan() {
let subjectA = PassthroughSubject<Int, Never>()
let scanB = Publishers.Scan(upstream: subjectA, initialResult: 10) { state, next in state + next }
var receivedC = [Subscribers.Event<Int, Never>]()
let sinkC = Subscribers.Sink<Int, Never>(
receiveCompletion: { receivedC.append(.complete($0)) },
receiveValue: { receivedC.append(.value($0)) }
)
scanB.subscribe(sinkC)
subjectA.send(sequence: 1...4, completion: .finished)
XCTAssertEqual(receivedC, [11, 13, 16, 20].asEvents(completion: .finished))
}
Ada garis konversi tambahan ( scanB
line) tetapi tidak jauh berbeda dibandingkan dengan kasus kontrol aslinya.
Sekarang, apa yang terjadi di tengah jalan A
Streaming data, pengamat baru D
berlangganan B
tidak tahu sama sekali B
Sudah di tengah-tengah keluaran?
Seharusnya pendengar baru D
Dapatkan setengah dari data yang diharapkan, meskipun ia tidak mengetahuinya C
Dan fakta bahwa koneksi sudah dimulai?
Jawabannya rumit. Bergantung pada logika program Anda, Anda mungkin menginginkannya setiap Salah satu opsi berikut:
- siaran grup –
D
Menerima bagian kedua dari nilaiC
menerima - cache – Setengah bagian depan empuk dan
D
Terima bagian pertama pesan segera setelah bergabung, serta nilai-nilai baru seperti multicast - nilai terbaru –
D
Segera terima nilai yang terakhir dipancarkan serta nilai baru seperti multicast - Tembolok khusus –
D
Hanya menerima jumlah yang diperlukan (misalnya sejak keyframe terakhir atau titik pemulihan) dan nilai baru (misalnya multicast) - berlangganan kembali –
D
harus memicu semua node upstream untuk memulai kembali pekerjaannya, kembali ke jaringan dan meminta ulang semua data, melakukan semua perhitungan lagi
Pada artikel ini saya hanya akan fokus pada opsi terakhir karena ini bisa dibilang merupakan perilaku default dalam penggabungan. Pada artikel berikutnya saya akan memperkenalkan metode lain.
Namun untuk saat ini, berikut contoh perilaku berlangganan kembali:
func testSequenceABCD() {
let sequenceA = Publishers.Sequence<ClosedRange<Int>, Never>(sequence: 1...4)
let scanB = Publishers.Scan(upstream: sequenceA, initialResult: 10) { state, next in state + next }
var receivedC = [Subscribers.Event<Int, Never>]()
let sinkC = Subscribers.Sink<Int, Never>(
receiveCompletion: { receivedC.append(.complete($0)) },
receiveValue: { receivedC.append(.value($0)) }
)
var receivedD = [Subscribers.Event<Int, Never>]()
let sinkD = Subscribers.Sink<Int, Never>(
receiveCompletion: { receivedD.append(.complete($0)) },
receiveValue: { receivedD.append(.value($0)) }
)
scanB.subscribe(sinkC)
scanB.subscribe(sinkD)
XCTAssertEqual(receivedC, [11, 13, 16, 20].asEvents(completion: .finished))
XCTAssertEqual(receivedD, [11, 13, 16, 20].asEvents(completion: .finished))
}
Tidak ada satu pun node di sini yang dibuat ulang, dan yang paling penting, B
Node – penuh keadaan scan
Prosesor – dibagikan antar langganan, tetapi masing-masing C
Dan D
Menerima versi nilai yang independen.
Jika menurut Anda sesuatu yang aneh terjadi karena urutannya tidak benar-benar tumpang tindih dalam waktu, berikut adalah pengujian yang setara di mana nilai urutan diteruskan secara manual dengan cara yang tumpang tindih:
func testOverlappingABCD() {
var subjects = [PassthroughSubject<Int, Never>]()
let deferred = Deferred { () -> PassthroughSubject<Int, Never> in
let request = PassthroughSubject<Int, Never>()
subjects.append(request)
return request
}
let scanB = Publishers.Scan(upstream: deferred, initialResult: 10) { state, next in state + next }
var receivedC = [Subscribers.Event<Int, Never>]()
let sinkC = Subscribers.Sink<Int, Never>(
receiveCompletion: { receivedC.append(.complete($0)) },
receiveValue: { receivedC.append(.value($0)) }
)
var receivedD = [Subscribers.Event<Int, Never>]()
let sinkD = Subscribers.Sink<Int, Never>(
receiveCompletion: { receivedD.append(.complete($0)) },
receiveValue: { receivedD.append(.value($0)) }
)
scanB.subscribe(sinkC)
subjects[0].send(sequence: 1...2, completion: nil)
scanB.subscribe(sinkD)
subjects[0].send(sequence: 3...4, completion: .finished)
subjects[1].send(sequence: 1...4, completion: .finished)
XCTAssertEqual(receivedC, [11, 13, 16, 20].asEvents(completion: .finished))
XCTAssertEqual(receivedD, [11, 13, 16, 20].asEvents(completion: .finished))
}
Kasus uji ini menunjukkan bahwa penafsiran “naif” terhadap Combine tidak dapat menjelaskan dengan tepat cara kerja Combine dalam semua situasi. Meskipun ada dua PassthroughSubject
s, dua Subscriber.Sink
s, hanya satu scanB
node di Publisher
grafik, tetapi berperilaku seperti dua node yang sangat berbeda – yang satu mewakili sinkC
dan satu untuk sinkD
.
Berlangganan, tipe paling tersembunyi
Bagaimana cara kerjanya?
Meskipun programmer membuat grafik Publisher
s, dan peta bayangan dari instance lain yang benar-benar melakukan pemrosesan dan pengiriman nilai. Kita bisa melihat plot bayangan ini di fungsi terakhir Subscriber
protokol.
func receive(subscription: Subscription)
setiap Publisher
Diarsir oleh sebuah contoh di bagan Anda Subscription
setiap aktif Subscriber
.
Kami tidak melihat efek dari bayangan ini Subscription
gambar pertama testScan
Misalnya karena berbagi PassthroughSubject
Gabungkan semua langganan menjadi satu, tetapi saat kita beralih menggunakan Deferred
grafik menjadi tidak kusut dan independen, dan kita dapat melihat efek kelipatannya Subscription
berbaring scan
simpul.
penjelasan akurat tentang persatuan adalah mengirim dan memproses nilai melalui grafik
Subscription
Misalnya, konstruksi yang malasPublisher
Contoh berdasarkan setiap langganan.
Kami biasanya tidak berinteraksi Subscription
Contoh. Subscription
Instance dibuat secara otomatis Publisher
ketika sebuah Subscriber
berlangganan. bagan Subscription
Contohnya mencerminkan grafik Publisher
S.
Anda dapat melihat mengapa ada perbedaan di antara keduanya Publisher
grafik dan Subscriber
Diagram (perbedaan antara penafsiran yang “naif” dan “akurat”) dapat membingungkan. Yang lebih menambah kebingungan adalah tidak adanya implementasi publik yang tersedia Subscription
(Saya mengabaikan Subscriptions.empty
Ini adalah pengganti yang mengabaikan semuanya).
Konsep berlangganan diperkenalkan oleh ekstensi Reaktif untuk .NET, yang berupaya membuat setiap mutasi grafik berperilaku seperti grafik yang benar-benar independen dan tidak terkait – sama seperti grafik yang muncul dalam bahasa pemrograman yang berfungsi secara ketat. Namun, bahasa pemrograman yang berfungsi secara ketat menghasilkan fungsi cache, sehingga menghindari penghitungan ulang nilai upstream yang berlebihan. Di Swift, jika kita tidak menyimpannya sendiri dalam cache, semuanya akan terduplikasi.
Jika saya ingin mengulangi semua pemrosesan, saya akan membangun kembali grafik penerbit.
Ketika saya menulis kerangka pemrograman reaktif saya sendiri, CwlSignal, yang utama Signal
Contohnya adalah diagram penyampaian – interpretasi “naif” sama dengan interpretasi “akurat”. Saya telah menangani masalah banyak pelanggan dengan cara yang berbeda: Signal
Sebuah node hanya mengizinkan satu node anak untuk diamati. Untuk kasus tertentu yang memerlukan banyak pendengar, CwlSignal menyediakan yang khusus SignalMulti
Opsi pengkodean node seperti “multicast”, “continuous” (cache terbaru), dan “play” (cache semua). Namun sengaja tidak disediakan opsi berlangganan ulang.
Bagaimanapun, mari kita lihat definisi dasarnya Subscription
:
public protocol Subscription : Cancellable, CustomCombineIdentifierConvertible {
func request(_ demand: Subscribers.Demand)
}
Sangat ringkas. Jika ini adalah definisi lengkap dari grafik pengiriman nilai bayangan, maka hal ini tidak mengungkapkan banyak hal.
Implementasi yang disesuaikan
Untung, Subscription
Bukan tidak mungkin untuk dipahami. Biasanya hanya melakukan semua peran Publisher
Dan Subscriber
Dalam interpretasi “naif”: ia menerima nilai, memprosesnya, dan meneruskannya ke sepanjang garis.
satu Subscription
Semua konten penting yang terkait dengannya harus disalin Publisher
menyalin penutupan dan status apa pun dari nilai awal yang disimpan Publisher
. Jadi, Subscription
Jadilah mandiri dan miliki semua yang diperlukan untuk menangani pemrosesan tanpa bantuan lebih lanjut Publisher
.
Bagian tersulitnya adalah menentukan kapan harus membuat Subscriber
dari a Publisher
Dan satukan semuanya. Saya mengambil langkah-langkah berikut dengan penekanan Publisher.receive
setelah beberapa percobaan:
catatan: kata-kata
Subscriber
DanSubscription
Sangat mirip. Saya yakin ini akan membingungkan (dan membingungkan untuk ditulis).
- Anda memanggil kombinasi tersebut
subscribe
berfungsi di AndaPublisher
melalui AndaSubscriber
. - Ini akan memanggil Anda
Publisher
darireceive
peralihan fungsiSubscriber
Anda berikan kepadasubscribe
Fungsi - ada
receive
FungsiPublisher
Buat khususSubscription
misalnya, itu juga harus sesuai denganSubscriber
dan harus mempertahankan referensi ke hilirSubscriber
. - milikmu
Publisher
panggilan masuksubscribe
di hulunyaPublisher
(jika ada) melalui bea cukaiSubscription
(Itulah mengapa itu harus sesuaiSubscriber
). - hulu
Publisher
panggilan masukreceive
sesuai dengan kebiasaanmuSubscription
meneruskannya ke instance langganannya sendiri. - milikmu
Subscriber
harus meneleponreceive
bagian hilirnyaSubscriber
- hilir
Subscriber
akan meneleponrequest
di dalam kamuSubscription
dan milikmuSubscription
harus dipanggilrequest
di hulunyaSubscription
.
Langkah-langkah tepatnya cenderung bervariasi tergantung pada apakah Anda Publisher
Ada hulunya Publisher
atau a Subject
.
Mari fokus pada transformasi Publisher
dengan hulu Publisher
karena ini adalah kasus yang umum. Seperti itu Publisher
akan ada satu receive
Fungsinya terlihat seperti ini:
public func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
let downstream = AnySubscriber(subscriber)
let behavior = CustomTransformer.Behavior(
downstream: downstream,
processor: processor,
state: initialState)
let subscription = CustomSubscription(behavior: behavior)
upstreamPublisher.subscribe(subscription)
}
Ada tip yang sangat penting di sini: meskipun kita adalah node hilir (dan dapat menerapkannya Subscriber
untuk diri kita sendiri), kita tidak akan meneruskan diri kita ke sana upstreamPublisher
. Sebaliknya melalui yang baru dibangun Subscription
misalnya saja. itu sebabnya Subscription
Implementasinya sering kali Subscriber
Implementasi juga. ini Subscription
Instance adalah grafik pengiriman independennya sendiri, yang hanya terhubung ke instance lain Subscription
Contoh.
Saya memilih untuk merancang langganan khusus saya menjadi dua bagian: pembungkus (untuk menerapkan perilaku yang saling eksklusif) dan kontrak perilaku (untuk menerapkan Publisher
perilaku tertentu dalam mutex). Oleh karena itu, pembungkus mutex diimplementasikan hanya sekali dan memiliki perilaku yang lebih sederhana.
Berikut adalah antarmuka pembungkusnya:
public struct CustomSubscription<Content: SubscriptionBehavior>: Subscriber, Subscription {
public typealias Input = Content.Input
public typealias Failure = Content.Failure
public var combineIdentifier: CombineIdentifier { return content.combineIdentifier }
let recursiveMutex = NSRecursiveLock()
let content: Content
}
Dan SubscriptionBehavior
Tampilan dalamnya seperti ini:
public protocol SubscriptionBehavior: class, Cancellable, CustomCombineIdentifierConvertible {
associatedtype Input
associatedtype Failure: Error
associatedtype Output
associatedtype OutputFailure: Error
var demand: Subscribers.Demand { get set }
var upstream: Subscription? { get set }
var downstream: AnySubscriber<Output, OutputFailure> { get }
func request(_ d: Subscribers.Demand)
func receive(_ input: Input) -> Subscribers.Demand
func receive(completion: Subscribers.Completion<Failure>)
}
Maka implementasinya sangat sederhana: nilainya lolos receive
berfungsi dan melakukan pemrosesan yang sesuai berdasarkan instance yang dibuat dan dikirim ke penerbit hilir AnySubscriber
.
Anda dapat melihat implementasi lengkapnya CustomSubject
, CustomScan
, CustomSubscription
Dan CustomSink
Di Repositori Penemuan Portofolio.
Beginilah cara kerjanya Subscription
Sepertinya bersatu? Hampir pasti tidak. Sejauh yang saya tahu, Combine menggunakan metode yang disebut Conduit
Ini menerapkan mutex sekali di awal, bukan setiap saat Publisher
Tahapan dalam pipa. Conduit
memang menggunakan implementasi mutex rekursif (lebih lanjut tentang itu di Bagian 3 seri ini), tetapi tampaknya memang demikian os_unfair_lock
(biasanya mutex non-rekursif).
Namun, implementasi ini berperilaku benar dan beroperasi dengan benar dengan implementasi Bersama yang resmi.
Ini adalah yang sebelumnya testOverlappingABCD
Penulisan ulang implementasi ini menunjukkan bahwa implementasi tersebut dapat digunakan sebagai pengganti implementasi default:
func testCustomABCD() {
var subjects = [CustomSubject<Int, Never>]()
let deferred = Deferred { () -> CustomSubject<Int, Never> in
let request = CustomSubject<Int, Never>()
subjects.append(request)
return request
}
let scanB = CustomScan(upstream: deferred, initialResult: 10) { state, next in state + next }
var receivedC = [Subscribers.Event<Int, Never>]()
let sinkC = CustomSink<Int, Never>(
receiveCompletion: { receivedC.append(.complete($0)) },
receiveValue: { receivedC.append(.value($0)) }
)
var receivedD = [Subscribers.Event<Int, Never>]()
let sinkD = CustomSink<Int, Never>(
receiveCompletion: { receivedD.append(.complete($0)) },
receiveValue: { receivedD.append(.value($0)) }
)
scanB.subscribe(sinkC)
subjects[0].send(sequence: 1...2, completion: nil)
scanB.subscribe(sinkD)
subjects[0].send(sequence: 3...4, completion: .finished)
subjects[1].send(sequence: 1...4, completion: .finished)
XCTAssertEqual(receivedC, [11, 13, 16, 20].asEvents(completion: .finished))
XCTAssertEqual(receivedD, [11, 13, 16, 20].asEvents(completion: .finished))
}
sebagai kesimpulan
unduh: Kode program seri ini, CombineExploration, dapat ditemukan di github.
kita sering berbicara tentang kita Publisher
Grafik bertindak seolah-olah mereka melakukan penghitungan dan mengeluarkan nilai, namun kenyataannya tidak demikian. Nilai dalam kombinasi diberikan oleh Subscription
bagan dan ulangi perhitungan untuk masing-masingnya Subscription
grafis.
perbedaan antara Publisher
Dan Subscription
Grafik ada untuk mencegah pelanggan yang berbeda saling mengganggu. Untuk melakukan ini, semua Status streaming yang Anda atur dalam penyesuaian Publisher
Harus disalin ke Subscription
Dan terjadi mutasi di sana.
mengharapkan…
Dalam kebanyakan kasus, kami tidak ingin penghitungan yang berlebihan. Jika memungkinkan, kami ingin menghitungnya satu kali untuk setiap nilai Publisher
bagan, kami menginginkan nilai terbaru dibagikan antara semua pelanggan.
Bagaimana kita menghindari “berlangganan kembali” dalam portofolio kita? Bagaimana cara mendapatkan hasil multicast atau cache? kita perlu menggunakan connect
atau menyimpan kelebihannya subscribe
Batalkan seperti yang kami lakukan di RxSwift? Oleh karena itu, secara umum, apa yang diperlukan agar langganan gabungan tetap hidup? Apa aturan untuk menggabungkan kekuatan untuk menjaga sesuatu (Publishers
, Subscribers
atau Subscriptions
) masih hidup?
Itu yang akan saya bahas pada postingan saya selanjutnya: berbagi.