阅读大规模代码:挑战与实践(4-EX)番外篇:试炼阅读某1000行的函数
上一章我们讲了怎么利用反向追踪的方法读代码。但是示范的代码太短了,实战中,一个函数几百上千行很常见(即使在一些比较优秀的开源项目中也可能存在)。
这章我们将带你阅读一个1000行的函数,不过这么长的代码,抱有目的地去读会比较有效率。
下面来阅读这个函数,摘自 rust-lightning 的 ChannelManager::read
函数。它的作用是从磁盘等任何二进制数据源中恢复一个 ChannelManager
对象。
我们设定的目标是,理解它到底从磁盘读了哪些东西出来,以便分析这些数据存放在磁盘是否安全。
设立一个合理的目标很重要。一千行代码都够有的语言实现一套编译器了,没有必要浪费时间在普通的数据系统项目上。
代码
请自己试一试。
1impl<'a, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref>
2 ReadableArgs<ChannelManagerReadArgs<'a, M, T, ES, NS, SP, F, R, L>>
3 for (BlockHash, ChannelManager<M, T, ES, NS, SP, F, R, L>)
4where
5 M::Target: chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
6 T::Target: BroadcasterInterface,
7 ES::Target: EntropySource,
8 NS::Target: NodeSigner,
9 SP::Target: SignerProvider,
10 F::Target: FeeEstimator,
11 R::Target: Router,
12 L::Target: Logger,
13{
14 fn read<Reader: io::Read>(
15 reader: &mut Reader, mut args: ChannelManagerReadArgs<'a, M, T, ES, NS, SP, F, R, L>,
16 ) -> Result<Self, DecodeError> {
17 let _ver = read_ver_prefix!(reader, SERIALIZATION_VERSION);
18
19 let chain_hash: ChainHash = Readable::read(reader)?;
20 let best_block_height: u32 = Readable::read(reader)?;
21 let best_block_hash: BlockHash = Readable::read(reader)?;
22
23 let mut failed_htlcs = Vec::new();
24
25 let channel_count: u64 = Readable::read(reader)?;
26 let mut funding_txo_set = hash_set_with_capacity(cmp::min(channel_count as usize, 128));
27 let mut funded_peer_channels: HashMap<PublicKey, HashMap<ChannelId, ChannelPhase<SP>>> =
28 hash_map_with_capacity(cmp::min(channel_count as usize, 128));
29 let mut outpoint_to_peer = hash_map_with_capacity(cmp::min(channel_count as usize, 128));
30 let mut short_to_chan_info = hash_map_with_capacity(cmp::min(channel_count as usize, 128));
31 let mut channel_closures = VecDeque::new();
32 let mut close_background_events = Vec::new();
33 let mut funding_txo_to_channel_id = hash_map_with_capacity(channel_count as usize);
34 for _ in 0..channel_count {
35 let mut channel: Channel<SP> = Channel::read(
36 reader,
37 (
38 &args.entropy_source,
39 &args.signer_provider,
40 best_block_height,
41 &provided_channel_type_features(&args.default_config),
42 args.color_source.clone(),
43 ),
44 )?;
45 let logger = WithChannelContext::from(&args.logger, &channel.context);
46 let funding_txo = channel.context.get_funding_txo().ok_or(DecodeError::InvalidValue)?;
47 funding_txo_to_channel_id.insert(funding_txo, channel.context.channel_id());
48 funding_txo_set.insert(funding_txo.clone());
49 if let Some(ref mut monitor) = args.channel_monitors.get_mut(&funding_txo) {
50 if channel.get_cur_holder_commitment_transaction_number()
51 > monitor.get_cur_holder_commitment_number()
52 || channel.get_revoked_counterparty_commitment_transaction_number()
53 > monitor.get_min_seen_secret()
54 || channel.get_cur_counterparty_commitment_transaction_number()
55 > monitor.get_cur_counterparty_commitment_number()
56 || channel.context.get_latest_monitor_update_id()
57 < monitor.get_latest_update_id()
58 {
59 // But if the channel is behind of the monitor, close the channel:
60 log_error!(
61 logger,
62 "A ChannelManager is stale compared to the current ChannelMonitor!"
63 );
64 log_error!(logger, " The channel will be force-closed and the latest commitment transaction from the ChannelMonitor broadcast.");
65 if channel.context.get_latest_monitor_update_id()
66 < monitor.get_latest_update_id()
67 {
68 log_error!(logger, " The ChannelMonitor for channel {} is at update_id {} but the ChannelManager is at update_id {}.",
69 &channel.context.channel_id(), monitor.get_latest_update_id(), channel.context.get_latest_monitor_update_id());
70 }
71 if channel.get_cur_holder_commitment_transaction_number()
72 > monitor.get_cur_holder_commitment_number()
73 {
74 log_error!(logger, " The ChannelMonitor for channel {} is at holder commitment number {} but the ChannelManager is at holder commitment number {}.",
75 &channel.context.channel_id(), monitor.get_cur_holder_commitment_number(), channel.get_cur_holder_commitment_transaction_number());
76 }
77 if channel.get_revoked_counterparty_commitment_transaction_number()
78 > monitor.get_min_seen_secret()
79 {
80 log_error!(logger, " The ChannelMonitor for channel {} is at revoked counterparty transaction number {} but the ChannelManager is at revoked counterparty transaction number {}.",
81 &channel.context.channel_id(), monitor.get_min_seen_secret(), channel.get_revoked_counterparty_commitment_transaction_number());
82 }
83 if channel.get_cur_counterparty_commitment_transaction_number()
84 > monitor.get_cur_counterparty_commitment_number()
85 {
86 log_error!(logger, " The ChannelMonitor for channel {} is at counterparty commitment transaction number {} but the ChannelManager is at counterparty commitment transaction number {}.",
87 &channel.context.channel_id(), monitor.get_cur_counterparty_commitment_number(), channel.get_cur_counterparty_commitment_transaction_number());
88 }
89 let mut shutdown_result =
90 channel.context.force_shutdown(true, ClosureReason::OutdatedChannelManager);
91 if shutdown_result.unbroadcasted_batch_funding_txid.is_some() {
92 return Err(DecodeError::InvalidValue);
93 }
94 if let Some((counterparty_node_id, funding_txo, channel_id, update)) =
95 shutdown_result.monitor_update
96 {
97 close_background_events.push(
98 BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
99 counterparty_node_id,
100 funding_txo,
101 channel_id,
102 update,
103 },
104 );
105 }
106 failed_htlcs.append(&mut shutdown_result.dropped_outbound_htlcs);
107 channel_closures.push_back((
108 events::Event::ChannelClosed {
109 channel_id: channel.context.channel_id(),
110 user_channel_id: channel.context.get_user_id(),
111 reason: ClosureReason::OutdatedChannelManager,
112 counterparty_node_id: Some(channel.context.get_counterparty_node_id()),
113 channel_capacity_sats: Some(channel.context.get_value_satoshis()),
114 channel_funding_txo: channel.context.get_funding_txo(),
115 },
116 None,
117 ));
118 for (channel_htlc_source, payment_hash) in channel.inflight_htlc_sources() {
119 let mut found_htlc = false;
120 for (monitor_htlc_source, _) in monitor.get_all_current_outbound_htlcs() {
121 if *channel_htlc_source == monitor_htlc_source {
122 found_htlc = true;
123 break;
124 }
125 }
126 if !found_htlc {
127 // If we have some HTLCs in the channel which are not present in the newer
128 // ChannelMonitor, they have been removed and should be failed back to
129 // ensure we don't forget them entirely. Note that if the missing HTLC(s)
130 // were actually claimed we'd have generated and ensured the previous-hop
131 // claim update ChannelMonitor updates were persisted prior to persising
132 // the ChannelMonitor update for the forward leg, so attempting to fail the
133 // backwards leg of the HTLC will simply be rejected.
134 log_info!(logger,
135 "Failing HTLC with hash {} as it is missing in the ChannelMonitor for channel {} but was present in the (stale) ChannelManager",
136 &channel.context.channel_id(), &payment_hash);
137 failed_htlcs.push((
138 channel_htlc_source.clone(),
139 *payment_hash,
140 channel.context.get_counterparty_node_id(),
141 channel.context.channel_id(),
142 ));
143 }
144 }
145 } else {
146 channel.on_startup_drop_completed_blocked_mon_updates_through(
147 &logger,
148 monitor.get_latest_update_id(),
149 );
150 log_info!(logger, "Successfully loaded channel {} at update_id {} against monitor at update id {} with {} blocked updates",
151 &channel.context.channel_id(), channel.context.get_latest_monitor_update_id(),
152 monitor.get_latest_update_id(), channel.blocked_monitor_updates_pending());
153 if let Some(short_channel_id) = channel.context.get_short_channel_id() {
154 short_to_chan_info.insert(
155 short_channel_id,
156 (
157 channel.context.get_counterparty_node_id(),
158 channel.context.channel_id(),
159 ),
160 );
161 }
162 if let Some(funding_txo) = channel.context.get_funding_txo() {
163 outpoint_to_peer
164 .insert(funding_txo, channel.context.get_counterparty_node_id());
165 }
166 match funded_peer_channels.entry(channel.context.get_counterparty_node_id()) {
167 hash_map::Entry::Occupied(mut entry) => {
168 let by_id_map = entry.get_mut();
169 by_id_map.insert(
170 channel.context.channel_id(),
171 ChannelPhase::Funded(channel),
172 );
173 },
174 hash_map::Entry::Vacant(entry) => {
175 let mut by_id_map = new_hash_map();
176 by_id_map.insert(
177 channel.context.channel_id(),
178 ChannelPhase::Funded(channel),
179 );
180 entry.insert(by_id_map);
181 },
182 }
183 }
184 } else if channel.is_awaiting_initial_mon_persist() {
185 // If we were persisted and shut down while the initial ChannelMonitor persistence
186 // was in-progress, we never broadcasted the funding transaction and can still
187 // safely discard the channel.
188 let _ = channel.context.force_shutdown(false, ClosureReason::DisconnectedPeer);
189 channel_closures.push_back((
190 events::Event::ChannelClosed {
191 channel_id: channel.context.channel_id(),
192 user_channel_id: channel.context.get_user_id(),
193 reason: ClosureReason::DisconnectedPeer,
194 counterparty_node_id: Some(channel.context.get_counterparty_node_id()),
195 channel_capacity_sats: Some(channel.context.get_value_satoshis()),
196 channel_funding_txo: channel.context.get_funding_txo(),
197 },
198 None,
199 ));
200 } else {
201 log_error!(
202 logger,
203 "Missing ChannelMonitor for channel {} needed by ChannelManager.",
204 &channel.context.channel_id()
205 );
206 log_error!(logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,");
207 log_error!(logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!");
208 log_error!(
209 logger,
210 " Without the ChannelMonitor we cannot continue without risking funds."
211 );
212 log_error!(logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning");
213 return Err(DecodeError::InvalidValue);
214 }
215 }
216
217 for (funding_txo, monitor) in args.channel_monitors.iter() {
218 if !funding_txo_set.contains(funding_txo) {
219 let logger = WithChannelMonitor::from(&args.logger, monitor);
220 let channel_id = monitor.channel_id();
221 log_info!(
222 logger,
223 "Queueing monitor update to ensure missing channel {} is force closed",
224 &channel_id
225 );
226 let monitor_update = ChannelMonitorUpdate {
227 update_id: CLOSED_CHANNEL_UPDATE_ID,
228 counterparty_node_id: None,
229 updates: vec![ChannelMonitorUpdateStep::ChannelForceClosed {
230 should_broadcast: true,
231 }],
232 channel_id: Some(monitor.channel_id()),
233 };
234 close_background_events.push(
235 BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((
236 *funding_txo,
237 channel_id,
238 monitor_update,
239 )),
240 );
241 }
242 }
243
244 const MAX_ALLOC_SIZE: usize = 1024 * 64;
245 let forward_htlcs_count: u64 = Readable::read(reader)?;
246 let mut forward_htlcs = hash_map_with_capacity(cmp::min(forward_htlcs_count as usize, 128));
247 for _ in 0..forward_htlcs_count {
248 let short_channel_id = Readable::read(reader)?;
249 let pending_forwards_count: u64 = Readable::read(reader)?;
250 let mut pending_forwards = Vec::with_capacity(cmp::min(
251 pending_forwards_count as usize,
252 MAX_ALLOC_SIZE / mem::size_of::<HTLCForwardInfo>(),
253 ));
254 for _ in 0..pending_forwards_count {
255 pending_forwards.push(Readable::read(reader)?);
256 }
257 forward_htlcs.insert(short_channel_id, pending_forwards);
258 }
259
260 let claimable_htlcs_count: u64 = Readable::read(reader)?;
261 let mut claimable_htlcs_list =
262 Vec::with_capacity(cmp::min(claimable_htlcs_count as usize, 128));
263 for _ in 0..claimable_htlcs_count {
264 let payment_hash = Readable::read(reader)?;
265 let previous_hops_len: u64 = Readable::read(reader)?;
266 let mut previous_hops = Vec::with_capacity(cmp::min(
267 previous_hops_len as usize,
268 MAX_ALLOC_SIZE / mem::size_of::<ClaimableHTLC>(),
269 ));
270 for _ in 0..previous_hops_len {
271 previous_hops.push(<ClaimableHTLC as Readable>::read(reader)?);
272 }
273 claimable_htlcs_list.push((payment_hash, previous_hops));
274 }
275
276 let peer_state_from_chans = |channel_by_id| PeerState {
277 channel_by_id,
278 inbound_channel_request_by_id: new_hash_map(),
279 latest_features: InitFeatures::empty(),
280 pending_msg_events: Vec::new(),
281 in_flight_monitor_updates: BTreeMap::new(),
282 monitor_update_blocked_actions: BTreeMap::new(),
283 actions_blocking_raa_monitor_updates: BTreeMap::new(),
284 is_connected: false,
285 };
286
287 let peer_count: u64 = Readable::read(reader)?;
288 let mut per_peer_state = hash_map_with_capacity(cmp::min(
289 peer_count as usize,
290 MAX_ALLOC_SIZE / mem::size_of::<(PublicKey, Mutex<PeerState<SP>>)>(),
291 ));
292 for _ in 0..peer_count {
293 let peer_pubkey = Readable::read(reader)?;
294 let peer_chans = funded_peer_channels.remove(&peer_pubkey).unwrap_or(new_hash_map());
295 let mut peer_state = peer_state_from_chans(peer_chans);
296 peer_state.latest_features = Readable::read(reader)?;
297 per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
298 }
299
300 let event_count: u64 = Readable::read(reader)?;
301 let mut pending_events_read: VecDeque<(events::Event, Option<EventCompletionAction>)> =
302 VecDeque::with_capacity(cmp::min(
303 event_count as usize,
304 MAX_ALLOC_SIZE / mem::size_of::<(events::Event, Option<EventCompletionAction>)>(),
305 ));
306 for _ in 0..event_count {
307 match MaybeReadable::read(reader)? {
308 Some(event) => pending_events_read.push_back((event, None)),
309 None => continue,
310 }
311 }
312
313 let background_event_count: u64 = Readable::read(reader)?;
314 for _ in 0..background_event_count {
315 match <u8 as Readable>::read(reader)? {
316 0 => {
317 // LDK versions prior to 0.0.116 wrote pending `MonitorUpdateRegeneratedOnStartup`s here,
318 // however we really don't (and never did) need them - we regenerate all
319 // on-startup monitor updates.
320 let _: OutPoint = Readable::read(reader)?;
321 let _: ChannelMonitorUpdate = Readable::read(reader)?;
322 },
323 _ => return Err(DecodeError::InvalidValue),
324 }
325 }
326
327 let _last_node_announcement_serial: u32 = Readable::read(reader)?; // Only used < 0.0.111
328 let highest_seen_timestamp: u32 = Readable::read(reader)?;
329
330 let pending_inbound_payment_count: u64 = Readable::read(reader)?;
331 let mut pending_inbound_payments: HashMap<PaymentHash, PendingInboundPayment> =
332 hash_map_with_capacity(cmp::min(
333 pending_inbound_payment_count as usize,
334 MAX_ALLOC_SIZE / (3 * 32),
335 ));
336 for _ in 0..pending_inbound_payment_count {
337 if pending_inbound_payments
338 .insert(Readable::read(reader)?, Readable::read(reader)?)
339 .is_some()
340 {
341 return Err(DecodeError::InvalidValue);
342 }
343 }
344
345 let pending_outbound_payments_count_compat: u64 = Readable::read(reader)?;
346 let mut pending_outbound_payments_compat: HashMap<PaymentId, PendingOutboundPayment> =
347 hash_map_with_capacity(cmp::min(
348 pending_outbound_payments_count_compat as usize,
349 MAX_ALLOC_SIZE / 32,
350 ));
351 for _ in 0..pending_outbound_payments_count_compat {
352 let session_priv = Readable::read(reader)?;
353 let payment = PendingOutboundPayment::Legacy {
354 session_privs: hash_set_from_iter([session_priv]),
355 };
356 if pending_outbound_payments_compat.insert(PaymentId(session_priv), payment).is_some() {
357 return Err(DecodeError::InvalidValue);
358 };
359 }
360
361 // pending_outbound_payments_no_retry is for compatibility with 0.0.101 clients.
362 let mut pending_outbound_payments_no_retry: Option<HashMap<PaymentId, HashSet<[u8; 32]>>> =
363 None;
364 let mut pending_outbound_payments = None;
365 let mut pending_intercepted_htlcs: Option<HashMap<InterceptId, PendingAddHTLCInfo>> =
366 Some(new_hash_map());
367 let mut received_network_pubkey: Option<PublicKey> = None;
368 let mut fake_scid_rand_bytes: Option<[u8; 32]> = None;
369 let mut probing_cookie_secret: Option<[u8; 32]> = None;
370 let mut claimable_htlc_purposes = None;
371 let mut claimable_htlc_onion_fields = None;
372 let mut pending_claiming_payments = Some(new_hash_map());
373 let mut monitor_update_blocked_actions_per_peer: Option<Vec<(_, BTreeMap<_, Vec<_>>)>> =
374 Some(Vec::new());
375 let mut events_override = None;
376 let mut in_flight_monitor_updates: Option<
377 HashMap<(PublicKey, OutPoint), Vec<ChannelMonitorUpdate>>,
378 > = None;
379 let mut decode_update_add_htlcs: Option<HashMap<u64, Vec<msgs::UpdateAddHTLC>>> = None;
380 read_tlv_fields!(reader, {
381 (1, pending_outbound_payments_no_retry, option),
382 (2, pending_intercepted_htlcs, option),
383 (3, pending_outbound_payments, option),
384 (4, pending_claiming_payments, option),
385 (5, received_network_pubkey, option),
386 (6, monitor_update_blocked_actions_per_peer, option),
387 (7, fake_scid_rand_bytes, option),
388 (8, events_override, option),
389 (9, claimable_htlc_purposes, optional_vec),
390 (10, in_flight_monitor_updates, option),
391 (11, probing_cookie_secret, option),
392 (13, claimable_htlc_onion_fields, optional_vec),
393 (14, decode_update_add_htlcs, option),
394 });
395 let mut decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map());
396 if fake_scid_rand_bytes.is_none() {
397 fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes());
398 }
399
400 if probing_cookie_secret.is_none() {
401 probing_cookie_secret = Some(args.entropy_source.get_secure_random_bytes());
402 }
403
404 if let Some(events) = events_override {
405 pending_events_read = events;
406 }
407
408 if !channel_closures.is_empty() {
409 pending_events_read.append(&mut channel_closures);
410 }
411
412 if pending_outbound_payments.is_none() && pending_outbound_payments_no_retry.is_none() {
413 pending_outbound_payments = Some(pending_outbound_payments_compat);
414 } else if pending_outbound_payments.is_none() {
415 let mut outbounds = new_hash_map();
416 for (id, session_privs) in pending_outbound_payments_no_retry.unwrap().drain() {
417 outbounds.insert(id, PendingOutboundPayment::Legacy { session_privs });
418 }
419 pending_outbound_payments = Some(outbounds);
420 }
421 let pending_outbounds = OutboundPayments {
422 pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()),
423 retry_lock: Mutex::new(()),
424 color_source: args.color_source.clone(),
425 };
426
427 // We have to replay (or skip, if they were completed after we wrote the `ChannelManager`)
428 // each `ChannelMonitorUpdate` in `in_flight_monitor_updates`. After doing so, we have to
429 // check that each channel we have isn't newer than the latest `ChannelMonitorUpdate`(s) we
430 // replayed, and for each monitor update we have to replay we have to ensure there's a
431 // `ChannelMonitor` for it.
432 //
433 // In order to do so we first walk all of our live channels (so that we can check their
434 // state immediately after doing the update replays, when we have the `update_id`s
435 // available) and then walk any remaining in-flight updates.
436 //
437 // Because the actual handling of the in-flight updates is the same, it's macro'ized here:
438 let mut pending_background_events = Vec::new();
439 macro_rules! handle_in_flight_updates {
440 ($counterparty_node_id: expr, $chan_in_flight_upds: expr, $funding_txo: expr,
441 $monitor: expr, $peer_state: expr, $logger: expr, $channel_info_log: expr
442 ) => {{
443 let mut max_in_flight_update_id = 0;
444 $chan_in_flight_upds.retain(|upd| upd.update_id > $monitor.get_latest_update_id());
445 for update in $chan_in_flight_upds.iter() {
446 log_trace!(
447 $logger,
448 "Replaying ChannelMonitorUpdate {} for {}channel {}",
449 update.update_id,
450 $channel_info_log,
451 &$monitor.channel_id()
452 );
453 max_in_flight_update_id = cmp::max(max_in_flight_update_id, update.update_id);
454 pending_background_events.push(
455 BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
456 counterparty_node_id: $counterparty_node_id,
457 funding_txo: $funding_txo,
458 channel_id: $monitor.channel_id(),
459 update: update.clone(),
460 },
461 );
462 }
463 if $chan_in_flight_upds.is_empty() {
464 // We had some updates to apply, but it turns out they had completed before we
465 // were serialized, we just weren't notified of that. Thus, we may have to run
466 // the completion actions for any monitor updates, but otherwise are done.
467 pending_background_events.push(BackgroundEvent::MonitorUpdatesComplete {
468 counterparty_node_id: $counterparty_node_id,
469 channel_id: $monitor.channel_id(),
470 });
471 }
472 if $peer_state
473 .in_flight_monitor_updates
474 .insert($funding_txo, $chan_in_flight_upds)
475 .is_some()
476 {
477 log_error!(
478 $logger,
479 "Duplicate in-flight monitor update set for the same channel!"
480 );
481 return Err(DecodeError::InvalidValue);
482 }
483 max_in_flight_update_id
484 }};
485 }
486
487 for (counterparty_id, peer_state_mtx) in per_peer_state.iter_mut() {
488 let mut peer_state_lock = peer_state_mtx.lock().unwrap();
489 let peer_state = &mut *peer_state_lock;
490 for phase in peer_state.channel_by_id.values() {
491 if let ChannelPhase::Funded(chan) = phase {
492 let logger = WithChannelContext::from(&args.logger, &chan.context);
493
494 // Channels that were persisted have to be funded, otherwise they should have been
495 // discarded.
496 let funding_txo =
497 chan.context.get_funding_txo().ok_or(DecodeError::InvalidValue)?;
498 let monitor = args
499 .channel_monitors
500 .get(&funding_txo)
501 .expect("We already checked for monitor presence when loading channels");
502 let mut max_in_flight_update_id = monitor.get_latest_update_id();
503 if let Some(in_flight_upds) = &mut in_flight_monitor_updates {
504 if let Some(mut chan_in_flight_upds) =
505 in_flight_upds.remove(&(*counterparty_id, funding_txo))
506 {
507 max_in_flight_update_id = cmp::max(
508 max_in_flight_update_id,
509 handle_in_flight_updates!(
510 *counterparty_id,
511 chan_in_flight_upds,
512 funding_txo,
513 monitor,
514 peer_state,
515 logger,
516 ""
517 ),
518 );
519 }
520 }
521 if chan.get_latest_unblocked_monitor_update_id() > max_in_flight_update_id {
522 // If the channel is ahead of the monitor, return DangerousValue:
523 log_error!(logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!");
524 log_error!(logger, " The ChannelMonitor for channel {} is at update_id {} with update_id through {} in-flight",
525 chan.context.channel_id(), monitor.get_latest_update_id(), max_in_flight_update_id);
526 log_error!(
527 logger,
528 " but the ChannelManager is at update_id {}.",
529 chan.get_latest_unblocked_monitor_update_id()
530 );
531 log_error!(logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,");
532 log_error!(logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!");
533 log_error!(logger, " Without the latest ChannelMonitor we cannot continue without risking funds.");
534 log_error!(logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning");
535 return Err(DecodeError::DangerousValue);
536 }
537 } else {
538 // We shouldn't have persisted (or read) any unfunded channel types so none should have been
539 // created in this `channel_by_id` map.
540 debug_assert!(false);
541 return Err(DecodeError::InvalidValue);
542 }
543 }
544 }
545
546 if let Some(in_flight_upds) = in_flight_monitor_updates {
547 for ((counterparty_id, funding_txo), mut chan_in_flight_updates) in in_flight_upds {
548 let channel_id = funding_txo_to_channel_id.get(&funding_txo).copied();
549 let logger = WithContext::from(&args.logger, Some(counterparty_id), channel_id);
550 if let Some(monitor) = args.channel_monitors.get(&funding_txo) {
551 // Now that we've removed all the in-flight monitor updates for channels that are
552 // still open, we need to replay any monitor updates that are for closed channels,
553 // creating the neccessary peer_state entries as we go.
554 let peer_state_mutex = per_peer_state
555 .entry(counterparty_id)
556 .or_insert_with(|| Mutex::new(peer_state_from_chans(new_hash_map())));
557 let mut peer_state = peer_state_mutex.lock().unwrap();
558 handle_in_flight_updates!(
559 counterparty_id,
560 chan_in_flight_updates,
561 funding_txo,
562 monitor,
563 peer_state,
564 logger,
565 "closed "
566 );
567 } else {
568 log_error!(logger, "A ChannelMonitor is missing even though we have in-flight updates for it! This indicates a potentially-critical violation of the chain::Watch API!");
569 log_error!(
570 logger,
571 " The ChannelMonitor for channel {} is missing.",
572 if let Some(channel_id) = channel_id {
573 channel_id.to_string()
574 } else {
575 format!("with outpoint {}", funding_txo)
576 }
577 );
578 log_error!(logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,");
579 log_error!(logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!");
580 log_error!(logger, " Without the latest ChannelMonitor we cannot continue without risking funds.");
581 log_error!(logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning");
582 log_error!(
583 logger,
584 " Pending in-flight updates are: {:?}",
585 chan_in_flight_updates
586 );
587 return Err(DecodeError::InvalidValue);
588 }
589 }
590 }
591
592 // Note that we have to do the above replays before we push new monitor updates.
593 pending_background_events.append(&mut close_background_events);
594
595 // If there's any preimages for forwarded HTLCs hanging around in ChannelMonitors we
596 // should ensure we try them again on the inbound edge. We put them here and do so after we
597 // have a fully-constructed `ChannelManager` at the end.
598 let mut pending_claims_to_replay = Vec::new();
599
600 {
601 // If we're tracking pending payments, ensure we haven't lost any by looking at the
602 // ChannelMonitor data for any channels for which we do not have authorative state
603 // (i.e. those for which we just force-closed above or we otherwise don't have a
604 // corresponding `Channel` at all).
605 // This avoids several edge-cases where we would otherwise "forget" about pending
606 // payments which are still in-flight via their on-chain state.
607 // We only rebuild the pending payments map if we were most recently serialized by
608 // 0.0.102+
609 for (_, monitor) in args.channel_monitors.iter() {
610 let counterparty_opt = outpoint_to_peer.get(&monitor.get_funding_txo().0);
611 if counterparty_opt.is_none() {
612 let logger = WithChannelMonitor::from(&args.logger, monitor);
613 for (htlc_source, (htlc, _)) in monitor.get_pending_or_resolved_outbound_htlcs()
614 {
615 if let HTLCSource::OutboundRoute {
616 payment_id, session_priv, path, ..
617 } = htlc_source
618 {
619 if path.hops.is_empty() {
620 log_error!(logger, "Got an empty path for a pending payment");
621 return Err(DecodeError::InvalidValue);
622 }
623
624 let path_amt = path.final_value_msat();
625 let mut session_priv_bytes = [0; 32];
626 session_priv_bytes[..].copy_from_slice(&session_priv[..]);
627 match pending_outbounds
628 .pending_outbound_payments
629 .lock()
630 .unwrap()
631 .entry(payment_id)
632 {
633 hash_map::Entry::Occupied(mut entry) => {
634 let newly_added =
635 entry.get_mut().insert(session_priv_bytes, &path);
636 log_info!(logger, "{} a pending payment path for {} msat for session priv {} on an existing pending payment with payment hash {}",
637 if newly_added { "Added" } else { "Had" }, path_amt, log_bytes!(session_priv_bytes), htlc.payment_hash);
638 },
639 hash_map::Entry::Vacant(entry) => {
640 let path_fee = path.fee_msat();
641 entry.insert(PendingOutboundPayment::Retryable {
642 retry_strategy: None,
643 attempts: PaymentAttempts::new(),
644 payment_params: None,
645 session_privs: hash_set_from_iter([session_priv_bytes]),
646 payment_hash: htlc.payment_hash,
647 payment_secret: None, // only used for retries, and we'll never retry on startup
648 payment_metadata: None, // only used for retries, and we'll never retry on startup
649 keysend_preimage: None, // only used for retries, and we'll never retry on startup
650 custom_tlvs: Vec::new(), // only used for retries, and we'll never retry on startup
651 pending_amt_msat: path_amt,
652 pending_fee_msat: Some(path_fee),
653 total_msat: path_amt,
654 starting_block_height: best_block_height,
655 remaining_max_total_routing_fee_msat: None, // only used for retries, and we'll never retry on startup
656 });
657 log_info!(logger, "Added a pending payment for {} msat with payment hash {} for path with session priv {}",
658 path_amt, &htlc.payment_hash, log_bytes!(session_priv_bytes));
659 },
660 }
661 }
662 }
663 for (htlc_source, (htlc, preimage_opt)) in
664 monitor.get_all_current_outbound_htlcs()
665 {
666 match htlc_source {
667 HTLCSource::PreviousHopData(prev_hop_data) => {
668 let pending_forward_matches_htlc = |info: &PendingAddHTLCInfo| {
669 info.prev_funding_outpoint == prev_hop_data.outpoint
670 && info.prev_htlc_id == prev_hop_data.htlc_id
671 };
672 // The ChannelMonitor is now responsible for this HTLC's
673 // failure/success and will let us know what its outcome is. If we
674 // still have an entry for this HTLC in `forward_htlcs` or
675 // `pending_intercepted_htlcs`, we were apparently not persisted after
676 // the monitor was when forwarding the payment.
677 decode_update_add_htlcs.retain(|scid, update_add_htlcs| {
678 update_add_htlcs.retain(|update_add_htlc| {
679 let matches = *scid == prev_hop_data.short_channel_id &&
680 update_add_htlc.htlc_id == prev_hop_data.htlc_id;
681 if matches {
682 log_info!(logger, "Removing pending to-decode HTLC with hash {} as it was forwarded to the closed channel {}",
683 &htlc.payment_hash, &monitor.channel_id());
684 }
685 !matches
686 });
687 !update_add_htlcs.is_empty()
688 });
689 forward_htlcs.retain(|_, forwards| {
690 forwards.retain(|forward| {
691 if let HTLCForwardInfo::AddHTLC(htlc_info) = forward {
692 if pending_forward_matches_htlc(&htlc_info) {
693 log_info!(logger, "Removing pending to-forward HTLC with hash {} as it was forwarded to the closed channel {}",
694 &htlc.payment_hash, &monitor.channel_id());
695 false
696 } else { true }
697 } else { true }
698 });
699 !forwards.is_empty()
700 });
701 pending_intercepted_htlcs.as_mut().unwrap().retain(|intercepted_id, htlc_info| {
702 if pending_forward_matches_htlc(&htlc_info) {
703 log_info!(logger, "Removing pending intercepted HTLC with hash {} as it was forwarded to the closed channel {}",
704 &htlc.payment_hash, &monitor.channel_id());
705 pending_events_read.retain(|(event, _)| {
706 if let Event::HTLCIntercepted { intercept_id: ev_id, .. } = event {
707 intercepted_id != ev_id
708 } else { true }
709 });
710 false
711 } else { true }
712 });
713 },
714 HTLCSource::OutboundRoute {
715 payment_id, session_priv, path, ..
716 } => {
717 if let Some(preimage) = preimage_opt {
718 let pending_events = Mutex::new(pending_events_read);
719 // Note that we set `from_onchain` to "false" here,
720 // deliberately keeping the pending payment around forever.
721 // Given it should only occur when we have a channel we're
722 // force-closing for being stale that's okay.
723 // The alternative would be to wipe the state when claiming,
724 // generating a `PaymentPathSuccessful` event but regenerating
725 // it and the `PaymentSent` on every restart until the
726 // `ChannelMonitor` is removed.
727 let compl_action =
728 EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
729 channel_funding_outpoint: monitor.get_funding_txo().0,
730 channel_id: monitor.channel_id(),
731 counterparty_node_id: path.hops[0].pubkey,
732 };
733 pending_outbounds.claim_htlc(
734 payment_id,
735 preimage,
736 session_priv,
737 path,
738 false,
739 compl_action,
740 &pending_events,
741 &&logger,
742 );
743 pending_events_read = pending_events.into_inner().unwrap();
744 }
745 },
746 }
747 }
748 }
749
750 // Whether the downstream channel was closed or not, try to re-apply any payment
751 // preimages from it which may be needed in upstream channels for forwarded
752 // payments.
753 let outbound_claimed_htlcs_iter = monitor
754 .get_all_current_outbound_htlcs()
755 .into_iter()
756 .filter_map(|(htlc_source, (htlc, preimage_opt))| {
757 if let HTLCSource::PreviousHopData(_) = htlc_source {
758 if let Some(payment_preimage) = preimage_opt {
759 Some((
760 htlc_source,
761 payment_preimage,
762 htlc.amount_msat,
763 htlc.amount_rgb,
764 // Check if `counterparty_opt.is_none()` to see if the
765 // downstream chan is closed (because we don't have a
766 // channel_id -> peer map entry).
767 counterparty_opt.is_none(),
768 counterparty_opt
769 .cloned()
770 .or(monitor.get_counterparty_node_id()),
771 monitor.get_funding_txo().0,
772 monitor.channel_id(),
773 ))
774 } else {
775 None
776 }
777 } else {
778 // If it was an outbound payment, we've handled it above - if a preimage
779 // came in and we persisted the `ChannelManager` we either handled it and
780 // are good to go or the channel force-closed - we don't have to handle the
781 // channel still live case here.
782 None
783 }
784 });
785 for tuple in outbound_claimed_htlcs_iter {
786 pending_claims_to_replay.push(tuple);
787 }
788 }
789 }
790
791 if !forward_htlcs.is_empty()
792 || !decode_update_add_htlcs.is_empty()
793 || pending_outbounds.needs_abandon()
794 {
795 // If we have pending HTLCs to forward, assume we either dropped a
796 // `PendingHTLCsForwardable` or the user received it but never processed it as they
797 // shut down before the timer hit. Either way, set the time_forwardable to a small
798 // constant as enough time has likely passed that we should simply handle the forwards
799 // now, or at least after the user gets a chance to reconnect to our peers.
800 pending_events_read.push_back((
801 events::Event::PendingHTLCsForwardable { time_forwardable: Duration::from_secs(2) },
802 None,
803 ));
804 }
805
806 let inbound_pmt_key_material = args.node_signer.get_inbound_payment_key_material();
807 let expanded_inbound_key = inbound_payment::ExpandedKey::new(&inbound_pmt_key_material);
808
809 let mut claimable_payments = hash_map_with_capacity(claimable_htlcs_list.len());
810 if let Some(purposes) = claimable_htlc_purposes {
811 if purposes.len() != claimable_htlcs_list.len() {
812 return Err(DecodeError::InvalidValue);
813 }
814 if let Some(onion_fields) = claimable_htlc_onion_fields {
815 if onion_fields.len() != claimable_htlcs_list.len() {
816 return Err(DecodeError::InvalidValue);
817 }
818 for (purpose, (onion, (payment_hash, htlcs))) in purposes
819 .into_iter()
820 .zip(onion_fields.into_iter().zip(claimable_htlcs_list.into_iter()))
821 {
822 let existing_payment = claimable_payments.insert(
823 payment_hash,
824 ClaimablePayment { purpose, htlcs, onion_fields: onion },
825 );
826 if existing_payment.is_some() {
827 return Err(DecodeError::InvalidValue);
828 }
829 }
830 } else {
831 for (purpose, (payment_hash, htlcs)) in
832 purposes.into_iter().zip(claimable_htlcs_list.into_iter())
833 {
834 let existing_payment = claimable_payments.insert(
835 payment_hash,
836 ClaimablePayment { purpose, htlcs, onion_fields: None },
837 );
838 if existing_payment.is_some() {
839 return Err(DecodeError::InvalidValue);
840 }
841 }
842 }
843 } else {
844 // LDK versions prior to 0.0.107 did not write a `pending_htlc_purposes`, but do
845 // include a `_legacy_hop_data` in the `OnionPayload`.
846 for (payment_hash, htlcs) in claimable_htlcs_list.drain(..) {
847 if htlcs.is_empty() {
848 return Err(DecodeError::InvalidValue);
849 }
850 let purpose = match &htlcs[0].onion_payload {
851 OnionPayload::Invoice { _legacy_hop_data } => {
852 if let Some(hop_data) = _legacy_hop_data {
853 events::PaymentPurpose::Bolt11InvoicePayment {
854 payment_preimage: match pending_inbound_payments.get(&payment_hash)
855 {
856 Some(inbound_payment) => inbound_payment.payment_preimage,
857 None => match inbound_payment::verify(
858 payment_hash,
859 &hop_data,
860 0,
861 &expanded_inbound_key,
862 &args.logger,
863 ) {
864 Ok((payment_preimage, _)) => payment_preimage,
865 Err(()) => {
866 log_error!(args.logger, "Failed to read claimable payment data for HTLC with payment hash {} - was not a pending inbound payment and didn't match our payment key", &payment_hash);
867 return Err(DecodeError::InvalidValue);
868 },
869 },
870 },
871 payment_secret: hop_data.payment_secret,
872 }
873 } else {
874 return Err(DecodeError::InvalidValue);
875 }
876 },
877 OnionPayload::Spontaneous(payment_preimage) => {
878 events::PaymentPurpose::SpontaneousPayment(*payment_preimage)
879 },
880 };
881 claimable_payments
882 .insert(payment_hash, ClaimablePayment { purpose, htlcs, onion_fields: None });
883 }
884 }
885
886 let mut secp_ctx = Secp256k1::new();
887 secp_ctx.seeded_randomize(&args.entropy_source.get_secure_random_bytes());
888
889 let our_network_pubkey = match args.node_signer.get_node_id(Recipient::Node) {
890 Ok(key) => key,
891 Err(()) => return Err(DecodeError::InvalidValue),
892 };
893 if let Some(network_pubkey) = received_network_pubkey {
894 if network_pubkey != our_network_pubkey {
895 log_error!(
896 args.logger,
897 "Key that was generated does not match the existing key. left: {}, right: {}",
898 network_pubkey,
899 our_network_pubkey
900 );
901 return Err(DecodeError::InvalidValue);
902 }
903 }
904
905 let mut outbound_scid_aliases = new_hash_set();
906 for (_peer_node_id, peer_state_mutex) in per_peer_state.iter_mut() {
907 let mut peer_state_lock = peer_state_mutex.lock().unwrap();
908 let peer_state = &mut *peer_state_lock;
909 for (chan_id, phase) in peer_state.channel_by_id.iter_mut() {
910 if let ChannelPhase::Funded(chan) = phase {
911 let logger = WithChannelContext::from(&args.logger, &chan.context);
912 if chan.context.outbound_scid_alias() == 0 {
913 let mut outbound_scid_alias;
914 loop {
915 outbound_scid_alias = fake_scid::Namespace::OutboundAlias
916 .get_fake_scid(
917 best_block_height,
918 &chain_hash,
919 fake_scid_rand_bytes.as_ref().unwrap(),
920 &args.entropy_source,
921 );
922 if outbound_scid_aliases.insert(outbound_scid_alias) {
923 break;
924 }
925 }
926 chan.context.set_outbound_scid_alias(outbound_scid_alias);
927 } else if !outbound_scid_aliases.insert(chan.context.outbound_scid_alias()) {
928 // Note that in rare cases its possible to hit this while reading an older
929 // channel if we just happened to pick a colliding outbound alias above.
930 log_error!(
931 logger,
932 "Got duplicate outbound SCID alias; {}",
933 chan.context.outbound_scid_alias()
934 );
935 return Err(DecodeError::InvalidValue);
936 }
937 if chan.context.is_usable() {
938 if short_to_chan_info
939 .insert(
940 chan.context.outbound_scid_alias(),
941 (chan.context.get_counterparty_node_id(), *chan_id),
942 )
943 .is_some()
944 {
945 // Note that in rare cases its possible to hit this while reading an older
946 // channel if we just happened to pick a colliding outbound alias above.
947 log_error!(
948 logger,
949 "Got duplicate outbound SCID alias; {}",
950 chan.context.outbound_scid_alias()
951 );
952 return Err(DecodeError::InvalidValue);
953 }
954 }
955 } else {
956 // We shouldn't have persisted (or read) any unfunded channel types so none should have been
957 // created in this `channel_by_id` map.
958 debug_assert!(false);
959 return Err(DecodeError::InvalidValue);
960 }
961 }
962 }
963
964 let bounded_fee_estimator = LowerBoundedFeeEstimator::new(args.fee_estimator);
965
966 for (_, monitor) in args.channel_monitors.iter() {
967 for (payment_hash, payment_preimage) in monitor.get_stored_preimages() {
968 if let Some(payment) = claimable_payments.remove(&payment_hash) {
969 log_info!(args.logger, "Re-claiming HTLCs with payment hash {} as we've released the preimage to a ChannelMonitor!", &payment_hash);
970 let mut claimable_amt_msat = 0;
971 let mut receiver_node_id = Some(our_network_pubkey);
972 let phantom_shared_secret = payment.htlcs[0].prev_hop.phantom_shared_secret;
973 if phantom_shared_secret.is_some() {
974 let phantom_pubkey = args
975 .node_signer
976 .get_node_id(Recipient::PhantomNode)
977 .expect("Failed to get node_id for phantom node recipient");
978 receiver_node_id = Some(phantom_pubkey)
979 }
980 for claimable_htlc in &payment.htlcs {
981 claimable_amt_msat += claimable_htlc.value;
982
983 // Add a holding-cell claim of the payment to the Channel, which should be
984 // applied ~immediately on peer reconnection. Because it won't generate a
985 // new commitment transaction we can just provide the payment preimage to
986 // the corresponding ChannelMonitor and nothing else.
987 //
988 // We do so directly instead of via the normal ChannelMonitor update
989 // procedure as the ChainMonitor hasn't yet been initialized, implying
990 // we're not allowed to call it directly yet. Further, we do the update
991 // without incrementing the ChannelMonitor update ID as there isn't any
992 // reason to.
993 // If we were to generate a new ChannelMonitor update ID here and then
994 // crash before the user finishes block connect we'd end up force-closing
995 // this channel as well. On the flip side, there's no harm in restarting
996 // without the new monitor persisted - we'll end up right back here on
997 // restart.
998 let previous_channel_id = claimable_htlc.prev_hop.channel_id;
999 if let Some(peer_node_id) =
1000 outpoint_to_peer.get(&claimable_htlc.prev_hop.outpoint)
1001 {
1002 let peer_state_mutex = per_peer_state.get(peer_node_id).unwrap();
1003 let mut peer_state_lock = peer_state_mutex.lock().unwrap();
1004 let peer_state = &mut *peer_state_lock;
1005 if let Some(ChannelPhase::Funded(channel)) =
1006 peer_state.channel_by_id.get_mut(&previous_channel_id)
1007 {
1008 let logger =
1009 WithChannelContext::from(&args.logger, &channel.context);
1010 channel.claim_htlc_while_disconnected_dropping_mon_update(
1011 claimable_htlc.prev_hop.htlc_id,
1012 payment_preimage,
1013 &&logger,
1014 );
1015 }
1016 }
1017 if let Some(previous_hop_monitor) =
1018 args.channel_monitors.get(&claimable_htlc.prev_hop.outpoint)
1019 {
1020 previous_hop_monitor.provide_payment_preimage(
1021 &payment_hash,
1022 &payment_preimage,
1023 &args.tx_broadcaster,
1024 &bounded_fee_estimator,
1025 &args.logger,
1026 );
1027 }
1028 }
1029 pending_events_read.push_back((
1030 events::Event::PaymentClaimed {
1031 receiver_node_id,
1032 payment_hash,
1033 purpose: payment.purpose,
1034 amount_msat: claimable_amt_msat,
1035 htlcs: payment.htlcs.iter().map(events::ClaimedHTLC::from).collect(),
1036 sender_intended_total_msat: payment
1037 .htlcs
1038 .first()
1039 .map(|htlc| htlc.total_msat),
1040 },
1041 None,
1042 ));
1043 }
1044 }
1045 }
1046
1047 for (node_id, monitor_update_blocked_actions) in
1048 monitor_update_blocked_actions_per_peer.unwrap()
1049 {
1050 if let Some(peer_state) = per_peer_state.get(&node_id) {
1051 for (channel_id, actions) in monitor_update_blocked_actions.iter() {
1052 let logger = WithContext::from(&args.logger, Some(node_id), Some(*channel_id));
1053 for action in actions.iter() {
1054 if let MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
1055 downstream_counterparty_and_funding_outpoint:
1056 Some((
1057 blocked_node_id,
1058 _blocked_channel_outpoint,
1059 blocked_channel_id,
1060 blocking_action,
1061 )),
1062 ..
1063 } = action
1064 {
1065 if let Some(blocked_peer_state) = per_peer_state.get(blocked_node_id) {
1066 log_trace!(logger,
1067 "Holding the next revoke_and_ack from {} until the preimage is durably persisted in the inbound edge's ChannelMonitor",
1068 blocked_channel_id);
1069 blocked_peer_state
1070 .lock()
1071 .unwrap()
1072 .actions_blocking_raa_monitor_updates
1073 .entry(*blocked_channel_id)
1074 .or_insert_with(Vec::new)
1075 .push(blocking_action.clone());
1076 } else {
1077 // If the channel we were blocking has closed, we don't need to
1078 // worry about it - the blocked monitor update should never have
1079 // been released from the `Channel` object so it can't have
1080 // completed, and if the channel closed there's no reason to bother
1081 // anymore.
1082 }
1083 }
1084 if let MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
1085 ..
1086 } = action
1087 {
1088 debug_assert!(false, "Non-event-generating channel freeing should not appear in our queue");
1089 }
1090 }
1091 }
1092 peer_state.lock().unwrap().monitor_update_blocked_actions =
1093 monitor_update_blocked_actions;
1094 } else {
1095 log_error!(
1096 WithContext::from(&args.logger, Some(node_id), None),
1097 "Got blocked actions without a per-peer-state for {}",
1098 node_id
1099 );
1100 return Err(DecodeError::InvalidValue);
1101 }
1102 }
1103
1104 let channel_manager = ChannelManager {
1105 chain_hash,
1106 fee_estimator: bounded_fee_estimator,
1107 chain_monitor: args.chain_monitor,
1108 tx_broadcaster: args.tx_broadcaster,
1109 router: args.router,
1110
1111 best_block: RwLock::new(BestBlock::new(best_block_hash, best_block_height)),
1112
1113 inbound_payment_key: expanded_inbound_key,
1114 pending_inbound_payments: Mutex::new(pending_inbound_payments),
1115 pending_outbound_payments: pending_outbounds,
1116 pending_intercepted_htlcs: Mutex::new(pending_intercepted_htlcs.unwrap()),
1117
1118 forward_htlcs: Mutex::new(forward_htlcs),
1119 decode_update_add_htlcs: Mutex::new(decode_update_add_htlcs),
1120 claimable_payments: Mutex::new(ClaimablePayments {
1121 claimable_payments,
1122 pending_claiming_payments: pending_claiming_payments.unwrap(),
1123 }),
1124 outbound_scid_aliases: Mutex::new(outbound_scid_aliases),
1125 outpoint_to_peer: Mutex::new(outpoint_to_peer),
1126 short_to_chan_info: FairRwLock::new(short_to_chan_info),
1127 fake_scid_rand_bytes: fake_scid_rand_bytes.unwrap(),
1128
1129 probing_cookie_secret: probing_cookie_secret.unwrap(),
1130
1131 our_network_pubkey,
1132 secp_ctx,
1133
1134 highest_seen_timestamp: AtomicUsize::new(highest_seen_timestamp as usize),
1135
1136 per_peer_state: FairRwLock::new(per_peer_state),
1137
1138 pending_events: Mutex::new(pending_events_read),
1139 pending_events_processor: AtomicBool::new(false),
1140 pending_background_events: Mutex::new(pending_background_events),
1141 total_consistency_lock: RwLock::new(()),
1142 background_events_processed_since_startup: AtomicBool::new(false),
1143
1144 event_persist_notifier: Notifier::new(),
1145 needs_persist_flag: AtomicBool::new(false),
1146
1147 funding_batch_states: Mutex::new(BTreeMap::new()),
1148
1149 pending_offers_messages: Mutex::new(Vec::new()),
1150
1151 pending_broadcast_messages: Mutex::new(Vec::new()),
1152
1153 entropy_source: args.entropy_source,
1154 node_signer: args.node_signer,
1155 signer_provider: args.signer_provider,
1156
1157 logger: args.logger,
1158 default_configuration: args.default_config,
1159 color_source: args.color_source,
1160 };
1161
1162 for htlc_source in failed_htlcs.drain(..) {
1163 let (source, payment_hash, counterparty_node_id, channel_id) = htlc_source;
1164 let receiver =
1165 HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id };
1166 let reason = HTLCFailReason::from_failure_code(0x4000 | 8);
1167 channel_manager.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver);
1168 }
1169
1170 for (
1171 source,
1172 preimage,
1173 downstream_value,
1174 downstream_rgb,
1175 downstream_closed,
1176 downstream_node_id,
1177 downstream_funding,
1178 downstream_channel_id,
1179 ) in pending_claims_to_replay
1180 {
1181 // We use `downstream_closed` in place of `from_onchain` here just as a guess - we
1182 // don't remember in the `ChannelMonitor` where we got a preimage from, but if the
1183 // channel is closed we just assume that it probably came from an on-chain claim.
1184 channel_manager.claim_funds_internal(
1185 source,
1186 preimage,
1187 Some(downstream_value),
1188 downstream_rgb,
1189 None,
1190 downstream_closed,
1191 true,
1192 downstream_node_id,
1193 downstream_funding,
1194 downstream_channel_id,
1195 None,
1196 );
1197 }
1198
1199 //TODO: Broadcast channel update for closed channels, but only after we've made a
1200 //connection or two.
1201
1202 Ok((best_block_hash.clone(), channel_manager))
1203 }
1204}
教学
下面我告诉你怎么读。
首先,我根本不读!我直接选中 Readable::read
然后按下 Ctrl+Shift+L
,这在我的编辑器中表示选中所有的相同文本。然后我进而选中整行。
考虑到还有一些反序列化位于循环语句中,再手动浏览一遍将其补上。得到下面的代码:
1 let chain_hash: ChainHash = Readable::read(reader)?;
2 let best_block_height: u32 = Readable::read(reader)?;
3 let best_block_hash: BlockHash = Readable::read(reader)?;
4 let channel_count: u64 = Readable::read(reader)?;
5 let forward_htlcs_count: u64 = Readable::read(reader)?;
6 for _ in 0..forward_htlcs_count {
7 let short_channel_id = Readable::read(reader)?;
8 let pending_forwards_count: u64 = Readable::read(reader)?;
9 pending_forwards.push(Readable::read(reader)?);
10 }
11 let claimable_htlcs_count: u64 = Readable::read(reader)?;
12 for _ in 0..claimable_htlcs_count {
13 let payment_hash = Readable::read(reader)?;
14 let previous_hops_len: u64 = Readable::read(reader)?;
15 }
16 let peer_count: u64 = Readable::read(reader)?;
17 for _ in 0..peer_count {
18 let peer_pubkey = Readable::read(reader)?;
19 peer_state.latest_features = Readable::read(reader)?;
20 }
21 let event_count: u64 = Readable::read(reader)?;
22 for _ in 0..event_count {
23 match MaybeReadable::read(reader)? {
24 }
25 let background_event_count: u64 = Readable::read(reader)?;
26 for _ in 0..background_event_count {
27 let _: OutPoint = Readable::read(reader)?;
28 let _: ChannelMonitorUpdate = Readable::read(reader)?;
29 }
30 let _last_node_announcement_serial: u32 = Readable::read(reader)?; // Only used < 0.0.111
31 let highest_seen_timestamp: u32 = Readable::read(reader)?;
32 let pending_inbound_payment_count: u64 = Readable::read(reader)?;
33 for _ in 0..pending_inbound_payment_count {
34 .insert(Readable::read(reader)?, Readable::read(reader)?)
35 }
36 let pending_outbound_payments_count_compat: u64 = Readable::read(reader)?;
37 for _ in 0..pending_outbound_payments_count_compat {
38 let session_priv = Readable::read(reader)?;
39 }
然后,这样做可能有漏网之鱼,再搜索 reader
找到遗漏的:
1 let _ver = read_ver_prefix!(reader, SERIALIZATION_VERSION);
2 for _ in 0..channel_count {
3 let mut channel: Channel<SP> = Channel::read(
4 reader,
5 (
6 &args.entropy_source,
7 &args.signer_provider,
8 best_block_height,
9 &provided_channel_type_features(&args.default_config),
10 args.color_source.clone(),
11 ),
12 )?;
13 }
14 for _ in 0..claimable_htlcs_count {
15 for _ in 0..previous_hops_len {
16 previous_hops.push(<ClaimableHTLC as Readable>::read(reader)?);
17 }
18 }
19 for _ in 0..background_event_count {
20 match <u8 as Readable>::read(reader)? {
21 0 => {
22 // LDK versions prior to 0.0.116 wrote pending `MonitorUpdateRegeneratedOnStartup`s here,
23 // however we really don't (and never did) need them - we regenerate all
24 // on-startup monitor updates.
25 },
26 _ => return Err(DecodeError::InvalidValue),
27 }
28 }
29 read_tlv_fields!(reader, {
30 (1, pending_outbound_payments_no_retry, option),
31 (2, pending_intercepted_htlcs, option),
32 (3, pending_outbound_payments, option),
33 (4, pending_claiming_payments, option),
34 (5, received_network_pubkey, option),
35 (6, monitor_update_blocked_actions_per_peer, option),
36 (7, fake_scid_rand_bytes, option),
37 (8, events_override, option),
38 (9, claimable_htlc_purposes, optional_vec),
39 (10, in_flight_monitor_updates, option),
40 (11, probing_cookie_secret, option),
41 (13, claimable_htlc_onion_fields, optional_vec),
42 (14, decode_update_add_htlcs, option),
43 });
好的,齐活了。
最终成果
序号 | 字段 | 类型 | 描述 |
---|---|---|---|
1 | chain_hash |
ChainHash |
区块链网络的标识符。 |
2 | best_block_height |
u32 |
最优(最近)区块的高度。 |
3 | best_block_hash |
BlockHash |
最优区块的哈希值。 |
4 | channel_count |
u64 |
管理的通道总数。 |
5 | forward_htlcs_count |
u64 |
转发 HTLC 的数量。 |
6 | 转发 HTLCs | ||
short_channel_id |
ShortChannelId |
短通道的标识符。 | |
pending_forwards_count |
u64 |
当前通道中挂起转发 HTLC 的数量。 | |
pending_forwards |
PendingForward (列表) |
挂起转发 HTLC 的列表。 | |
7 | claimable_htlcs_count |
u64 |
可领取的 HTLC 数量。 |
8 | 可领取的 HTLCs | ||
payment_hash |
PaymentHash |
与 HTLC 相关的支付哈希。 | |
previous_hops_len |
u64 |
HTLC 路径中的前一跳数量。 | |
9 | peer_count |
u64 |
连接的节点总数。 |
10 | 节点 | ||
peer_pubkey |
PubKey |
节点的公钥。 | |
peer_state.latest_features |
LatestFeatures |
节点支持的最新功能集。 | |
11 | event_count |
u64 |
记录的事件数量。 |
12 | 事件 | 使用 MaybeReadable 反序列化的事件列表。 |
|
13 | background_event_count |
u64 |
后台事件的数量。 |
14 | 后台事件 | ||
OutPoint |
OutPoint |
引用特定 UTXO 的输入。 | |
ChannelMonitorUpdate |
ChannelMonitorUpdate |
通道监控的更新信息。 | |
15 | _last_node_announcement_serial |
u32 |
(已弃用)最后一个节点公告的序列号(仅适用于版本 < 0.0.111)。 |
16 | highest_seen_timestamp |
u32 |
最近一次事件的时间戳。 |
17 | pending_inbound_payment_count |
u64 |
挂起的入站支付数量。 |
18 | 挂起的入站支付 | ||
PaymentId |
PaymentId |
入站支付的标识符。 | |
PaymentData |
PaymentData |
与挂起入站支付相关的数据。 | |
19 | pending_outbound_payments_count_compat |
u64 |
挂起的出站支付数量(兼容层)。 |
20 | 挂起的出站支付(兼容层) | ||
session_priv |
SessionPriv |
出站支付的会话私有数据。 | |
21 | _ver |
u8 (版本前缀) |
序列化的版本前缀(与 SERIALIZATION_VERSION 验证)。 |
22 | 通道 | ||
Channel<SP> |
Channel (列表) |
通道状态的列表,通过上下文参数反序列化。 | |
23 | 可领取 HTLC 的前一跳 | ||
ClaimableHTLC |
ClaimableHTLC (列表) |
每个可领取 HTLC 的前一跳详细信息。 | |
24 | 后台事件验证 | ||
u8 |
u8 |
验证后台事件,仅接受 0 (LDK < 0.0.116)。 |
|
25 | TLV 字段 | ||
1: pending_outbound_payments_no_retry |
可选字段 | 无重试的挂起出站支付。 | |
2: pending_intercepted_htlcs |
可选字段 | 挂起的拦截 HTLCs。 | |
3: pending_outbound_payments |
可选字段 | 可重试的挂起出站支付。 | |
4: pending_claiming_payments |
可选字段 | 可领取的挂起支付。 | |
5: received_network_pubkey |
可选字段 | 从节点接收到的网络公钥。 | |
6: monitor_update_blocked_actions_per_peer |
可选字段 | 每个节点阻止的监控更新操作。 | |
7: fake_scid_rand_bytes |
可选字段 | 假短通道 ID (SCID) 的随机字节。 | |
8: events_override |
可选字段 | 事件处理的重写选项。 | |
9: claimable_htlc_purposes |
可选向量字段 | 与可领取 HTLC 相关的用途。 | |
10: in_flight_monitor_updates |
可选字段 | 当前正在进行的监控更新。 | |
11: probing_cookie_secret |
可选字段 | 用于探测 cookie 的密钥。 | |
13: claimable_htlc_onion_fields |
可选向量字段 | 可领取 HTLC 的洋葱包字段。 | |
14: decode_update_add_htlcs |
可选字段 | 添加 HTLC 的更新解码。 |
总结
效率!效率!还是效率!
当阅读这种业务复杂、逻辑简单但又特别长的代码时,我们核心点在于不要试图死磕,而是着力于函数各个输入具体被怎样操作,产生怎样的输出或 side effect。这样可以有效提高阅读效率,达成阅读的目标。在这个过程中,你可以自行探索各种技巧(欢迎 PR 补充!)。