阅读大规模代码:挑战与实践(4-EX)番外篇:试炼阅读某1000行的函数

上一章我们讲了怎么利用反向追踪的方法读代码。但是示范的代码太短了,实战中,一个函数几百上千行很常见(即使在一些比较优秀的开源项目中也可能存在)。

这章我们将带你阅读一个1000行的函数,不过这么长的代码,抱有目的地去读会比较有效率。

下面来阅读这个函数,摘自 rust-lightning 的 ChannelManager::read 函数。它的作用是从磁盘等任何二进制数据源中恢复一个 ChannelManager 对象。

我们设定的目标是,理解它到底从磁盘读了哪些东西出来,以便分析这些数据存放在磁盘是否安全。

设立一个合理的目标很重要。一千行代码都够有的语言实现一套编译器了,没有必要浪费时间在普通的数据系统项目上。

代码

请自己试一试。

   1impl<'a, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref>
   2	ReadableArgs<ChannelManagerReadArgs<'a, M, T, ES, NS, SP, F, R, L>>
   3	for (BlockHash, ChannelManager<M, T, ES, NS, SP, F, R, L>)
   4where
   5	M::Target: chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
   6	T::Target: BroadcasterInterface,
   7	ES::Target: EntropySource,
   8	NS::Target: NodeSigner,
   9	SP::Target: SignerProvider,
  10	F::Target: FeeEstimator,
  11	R::Target: Router,
  12	L::Target: Logger,
  13{
  14	fn read<Reader: io::Read>(
  15		reader: &mut Reader, mut args: ChannelManagerReadArgs<'a, M, T, ES, NS, SP, F, R, L>,
  16	) -> Result<Self, DecodeError> {
  17		let _ver = read_ver_prefix!(reader, SERIALIZATION_VERSION);
  18
  19		let chain_hash: ChainHash = Readable::read(reader)?;
  20		let best_block_height: u32 = Readable::read(reader)?;
  21		let best_block_hash: BlockHash = Readable::read(reader)?;
  22
  23		let mut failed_htlcs = Vec::new();
  24
  25		let channel_count: u64 = Readable::read(reader)?;
  26		let mut funding_txo_set = hash_set_with_capacity(cmp::min(channel_count as usize, 128));
  27		let mut funded_peer_channels: HashMap<PublicKey, HashMap<ChannelId, ChannelPhase<SP>>> =
  28			hash_map_with_capacity(cmp::min(channel_count as usize, 128));
  29		let mut outpoint_to_peer = hash_map_with_capacity(cmp::min(channel_count as usize, 128));
  30		let mut short_to_chan_info = hash_map_with_capacity(cmp::min(channel_count as usize, 128));
  31		let mut channel_closures = VecDeque::new();
  32		let mut close_background_events = Vec::new();
  33		let mut funding_txo_to_channel_id = hash_map_with_capacity(channel_count as usize);
  34		for _ in 0..channel_count {
  35			let mut channel: Channel<SP> = Channel::read(
  36				reader,
  37				(
  38					&args.entropy_source,
  39					&args.signer_provider,
  40					best_block_height,
  41					&provided_channel_type_features(&args.default_config),
  42					args.color_source.clone(),
  43				),
  44			)?;
  45			let logger = WithChannelContext::from(&args.logger, &channel.context);
  46			let funding_txo = channel.context.get_funding_txo().ok_or(DecodeError::InvalidValue)?;
  47			funding_txo_to_channel_id.insert(funding_txo, channel.context.channel_id());
  48			funding_txo_set.insert(funding_txo.clone());
  49			if let Some(ref mut monitor) = args.channel_monitors.get_mut(&funding_txo) {
  50				if channel.get_cur_holder_commitment_transaction_number()
  51					> monitor.get_cur_holder_commitment_number()
  52					|| channel.get_revoked_counterparty_commitment_transaction_number()
  53						> monitor.get_min_seen_secret()
  54					|| channel.get_cur_counterparty_commitment_transaction_number()
  55						> monitor.get_cur_counterparty_commitment_number()
  56					|| channel.context.get_latest_monitor_update_id()
  57						< monitor.get_latest_update_id()
  58				{
  59					// But if the channel is behind of the monitor, close the channel:
  60					log_error!(
  61						logger,
  62						"A ChannelManager is stale compared to the current ChannelMonitor!"
  63					);
  64					log_error!(logger, " The channel will be force-closed and the latest commitment transaction from the ChannelMonitor broadcast.");
  65					if channel.context.get_latest_monitor_update_id()
  66						< monitor.get_latest_update_id()
  67					{
  68						log_error!(logger, " The ChannelMonitor for channel {} is at update_id {} but the ChannelManager is at update_id {}.",
  69							&channel.context.channel_id(), monitor.get_latest_update_id(), channel.context.get_latest_monitor_update_id());
  70					}
  71					if channel.get_cur_holder_commitment_transaction_number()
  72						> monitor.get_cur_holder_commitment_number()
  73					{
  74						log_error!(logger, " The ChannelMonitor for channel {} is at holder commitment number {} but the ChannelManager is at holder commitment number {}.",
  75							&channel.context.channel_id(), monitor.get_cur_holder_commitment_number(), channel.get_cur_holder_commitment_transaction_number());
  76					}
  77					if channel.get_revoked_counterparty_commitment_transaction_number()
  78						> monitor.get_min_seen_secret()
  79					{
  80						log_error!(logger, " The ChannelMonitor for channel {} is at revoked counterparty transaction number {} but the ChannelManager is at revoked counterparty transaction number {}.",
  81							&channel.context.channel_id(), monitor.get_min_seen_secret(), channel.get_revoked_counterparty_commitment_transaction_number());
  82					}
  83					if channel.get_cur_counterparty_commitment_transaction_number()
  84						> monitor.get_cur_counterparty_commitment_number()
  85					{
  86						log_error!(logger, " The ChannelMonitor for channel {} is at counterparty commitment transaction number {} but the ChannelManager is at counterparty commitment transaction number {}.",
  87							&channel.context.channel_id(), monitor.get_cur_counterparty_commitment_number(), channel.get_cur_counterparty_commitment_transaction_number());
  88					}
  89					let mut shutdown_result =
  90						channel.context.force_shutdown(true, ClosureReason::OutdatedChannelManager);
  91					if shutdown_result.unbroadcasted_batch_funding_txid.is_some() {
  92						return Err(DecodeError::InvalidValue);
  93					}
  94					if let Some((counterparty_node_id, funding_txo, channel_id, update)) =
  95						shutdown_result.monitor_update
  96					{
  97						close_background_events.push(
  98							BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
  99								counterparty_node_id,
 100								funding_txo,
 101								channel_id,
 102								update,
 103							},
 104						);
 105					}
 106					failed_htlcs.append(&mut shutdown_result.dropped_outbound_htlcs);
 107					channel_closures.push_back((
 108						events::Event::ChannelClosed {
 109							channel_id: channel.context.channel_id(),
 110							user_channel_id: channel.context.get_user_id(),
 111							reason: ClosureReason::OutdatedChannelManager,
 112							counterparty_node_id: Some(channel.context.get_counterparty_node_id()),
 113							channel_capacity_sats: Some(channel.context.get_value_satoshis()),
 114							channel_funding_txo: channel.context.get_funding_txo(),
 115						},
 116						None,
 117					));
 118					for (channel_htlc_source, payment_hash) in channel.inflight_htlc_sources() {
 119						let mut found_htlc = false;
 120						for (monitor_htlc_source, _) in monitor.get_all_current_outbound_htlcs() {
 121							if *channel_htlc_source == monitor_htlc_source {
 122								found_htlc = true;
 123								break;
 124							}
 125						}
 126						if !found_htlc {
 127							// If we have some HTLCs in the channel which are not present in the newer
 128							// ChannelMonitor, they have been removed and should be failed back to
 129							// ensure we don't forget them entirely. Note that if the missing HTLC(s)
 130							// were actually claimed we'd have generated and ensured the previous-hop
 131							// claim update ChannelMonitor updates were persisted prior to persising
 132							// the ChannelMonitor update for the forward leg, so attempting to fail the
 133							// backwards leg of the HTLC will simply be rejected.
 134							log_info!(logger,
 135								"Failing HTLC with hash {} as it is missing in the ChannelMonitor for channel {} but was present in the (stale) ChannelManager",
 136								&channel.context.channel_id(), &payment_hash);
 137							failed_htlcs.push((
 138								channel_htlc_source.clone(),
 139								*payment_hash,
 140								channel.context.get_counterparty_node_id(),
 141								channel.context.channel_id(),
 142							));
 143						}
 144					}
 145				} else {
 146					channel.on_startup_drop_completed_blocked_mon_updates_through(
 147						&logger,
 148						monitor.get_latest_update_id(),
 149					);
 150					log_info!(logger, "Successfully loaded channel {} at update_id {} against monitor at update id {} with {} blocked updates",
 151						&channel.context.channel_id(), channel.context.get_latest_monitor_update_id(),
 152						monitor.get_latest_update_id(), channel.blocked_monitor_updates_pending());
 153					if let Some(short_channel_id) = channel.context.get_short_channel_id() {
 154						short_to_chan_info.insert(
 155							short_channel_id,
 156							(
 157								channel.context.get_counterparty_node_id(),
 158								channel.context.channel_id(),
 159							),
 160						);
 161					}
 162					if let Some(funding_txo) = channel.context.get_funding_txo() {
 163						outpoint_to_peer
 164							.insert(funding_txo, channel.context.get_counterparty_node_id());
 165					}
 166					match funded_peer_channels.entry(channel.context.get_counterparty_node_id()) {
 167						hash_map::Entry::Occupied(mut entry) => {
 168							let by_id_map = entry.get_mut();
 169							by_id_map.insert(
 170								channel.context.channel_id(),
 171								ChannelPhase::Funded(channel),
 172							);
 173						},
 174						hash_map::Entry::Vacant(entry) => {
 175							let mut by_id_map = new_hash_map();
 176							by_id_map.insert(
 177								channel.context.channel_id(),
 178								ChannelPhase::Funded(channel),
 179							);
 180							entry.insert(by_id_map);
 181						},
 182					}
 183				}
 184			} else if channel.is_awaiting_initial_mon_persist() {
 185				// If we were persisted and shut down while the initial ChannelMonitor persistence
 186				// was in-progress, we never broadcasted the funding transaction and can still
 187				// safely discard the channel.
 188				let _ = channel.context.force_shutdown(false, ClosureReason::DisconnectedPeer);
 189				channel_closures.push_back((
 190					events::Event::ChannelClosed {
 191						channel_id: channel.context.channel_id(),
 192						user_channel_id: channel.context.get_user_id(),
 193						reason: ClosureReason::DisconnectedPeer,
 194						counterparty_node_id: Some(channel.context.get_counterparty_node_id()),
 195						channel_capacity_sats: Some(channel.context.get_value_satoshis()),
 196						channel_funding_txo: channel.context.get_funding_txo(),
 197					},
 198					None,
 199				));
 200			} else {
 201				log_error!(
 202					logger,
 203					"Missing ChannelMonitor for channel {} needed by ChannelManager.",
 204					&channel.context.channel_id()
 205				);
 206				log_error!(logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,");
 207				log_error!(logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!");
 208				log_error!(
 209					logger,
 210					" Without the ChannelMonitor we cannot continue without risking funds."
 211				);
 212				log_error!(logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning");
 213				return Err(DecodeError::InvalidValue);
 214			}
 215		}
 216
 217		for (funding_txo, monitor) in args.channel_monitors.iter() {
 218			if !funding_txo_set.contains(funding_txo) {
 219				let logger = WithChannelMonitor::from(&args.logger, monitor);
 220				let channel_id = monitor.channel_id();
 221				log_info!(
 222					logger,
 223					"Queueing monitor update to ensure missing channel {} is force closed",
 224					&channel_id
 225				);
 226				let monitor_update = ChannelMonitorUpdate {
 227					update_id: CLOSED_CHANNEL_UPDATE_ID,
 228					counterparty_node_id: None,
 229					updates: vec![ChannelMonitorUpdateStep::ChannelForceClosed {
 230						should_broadcast: true,
 231					}],
 232					channel_id: Some(monitor.channel_id()),
 233				};
 234				close_background_events.push(
 235					BackgroundEvent::ClosedMonitorUpdateRegeneratedOnStartup((
 236						*funding_txo,
 237						channel_id,
 238						monitor_update,
 239					)),
 240				);
 241			}
 242		}
 243
 244		const MAX_ALLOC_SIZE: usize = 1024 * 64;
 245		let forward_htlcs_count: u64 = Readable::read(reader)?;
 246		let mut forward_htlcs = hash_map_with_capacity(cmp::min(forward_htlcs_count as usize, 128));
 247		for _ in 0..forward_htlcs_count {
 248			let short_channel_id = Readable::read(reader)?;
 249			let pending_forwards_count: u64 = Readable::read(reader)?;
 250			let mut pending_forwards = Vec::with_capacity(cmp::min(
 251				pending_forwards_count as usize,
 252				MAX_ALLOC_SIZE / mem::size_of::<HTLCForwardInfo>(),
 253			));
 254			for _ in 0..pending_forwards_count {
 255				pending_forwards.push(Readable::read(reader)?);
 256			}
 257			forward_htlcs.insert(short_channel_id, pending_forwards);
 258		}
 259
 260		let claimable_htlcs_count: u64 = Readable::read(reader)?;
 261		let mut claimable_htlcs_list =
 262			Vec::with_capacity(cmp::min(claimable_htlcs_count as usize, 128));
 263		for _ in 0..claimable_htlcs_count {
 264			let payment_hash = Readable::read(reader)?;
 265			let previous_hops_len: u64 = Readable::read(reader)?;
 266			let mut previous_hops = Vec::with_capacity(cmp::min(
 267				previous_hops_len as usize,
 268				MAX_ALLOC_SIZE / mem::size_of::<ClaimableHTLC>(),
 269			));
 270			for _ in 0..previous_hops_len {
 271				previous_hops.push(<ClaimableHTLC as Readable>::read(reader)?);
 272			}
 273			claimable_htlcs_list.push((payment_hash, previous_hops));
 274		}
 275
 276		let peer_state_from_chans = |channel_by_id| PeerState {
 277			channel_by_id,
 278			inbound_channel_request_by_id: new_hash_map(),
 279			latest_features: InitFeatures::empty(),
 280			pending_msg_events: Vec::new(),
 281			in_flight_monitor_updates: BTreeMap::new(),
 282			monitor_update_blocked_actions: BTreeMap::new(),
 283			actions_blocking_raa_monitor_updates: BTreeMap::new(),
 284			is_connected: false,
 285		};
 286
 287		let peer_count: u64 = Readable::read(reader)?;
 288		let mut per_peer_state = hash_map_with_capacity(cmp::min(
 289			peer_count as usize,
 290			MAX_ALLOC_SIZE / mem::size_of::<(PublicKey, Mutex<PeerState<SP>>)>(),
 291		));
 292		for _ in 0..peer_count {
 293			let peer_pubkey = Readable::read(reader)?;
 294			let peer_chans = funded_peer_channels.remove(&peer_pubkey).unwrap_or(new_hash_map());
 295			let mut peer_state = peer_state_from_chans(peer_chans);
 296			peer_state.latest_features = Readable::read(reader)?;
 297			per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
 298		}
 299
 300		let event_count: u64 = Readable::read(reader)?;
 301		let mut pending_events_read: VecDeque<(events::Event, Option<EventCompletionAction>)> =
 302			VecDeque::with_capacity(cmp::min(
 303				event_count as usize,
 304				MAX_ALLOC_SIZE / mem::size_of::<(events::Event, Option<EventCompletionAction>)>(),
 305			));
 306		for _ in 0..event_count {
 307			match MaybeReadable::read(reader)? {
 308				Some(event) => pending_events_read.push_back((event, None)),
 309				None => continue,
 310			}
 311		}
 312
 313		let background_event_count: u64 = Readable::read(reader)?;
 314		for _ in 0..background_event_count {
 315			match <u8 as Readable>::read(reader)? {
 316				0 => {
 317					// LDK versions prior to 0.0.116 wrote pending `MonitorUpdateRegeneratedOnStartup`s here,
 318					// however we really don't (and never did) need them - we regenerate all
 319					// on-startup monitor updates.
 320					let _: OutPoint = Readable::read(reader)?;
 321					let _: ChannelMonitorUpdate = Readable::read(reader)?;
 322				},
 323				_ => return Err(DecodeError::InvalidValue),
 324			}
 325		}
 326
 327		let _last_node_announcement_serial: u32 = Readable::read(reader)?; // Only used < 0.0.111
 328		let highest_seen_timestamp: u32 = Readable::read(reader)?;
 329
 330		let pending_inbound_payment_count: u64 = Readable::read(reader)?;
 331		let mut pending_inbound_payments: HashMap<PaymentHash, PendingInboundPayment> =
 332			hash_map_with_capacity(cmp::min(
 333				pending_inbound_payment_count as usize,
 334				MAX_ALLOC_SIZE / (3 * 32),
 335			));
 336		for _ in 0..pending_inbound_payment_count {
 337			if pending_inbound_payments
 338				.insert(Readable::read(reader)?, Readable::read(reader)?)
 339				.is_some()
 340			{
 341				return Err(DecodeError::InvalidValue);
 342			}
 343		}
 344
 345		let pending_outbound_payments_count_compat: u64 = Readable::read(reader)?;
 346		let mut pending_outbound_payments_compat: HashMap<PaymentId, PendingOutboundPayment> =
 347			hash_map_with_capacity(cmp::min(
 348				pending_outbound_payments_count_compat as usize,
 349				MAX_ALLOC_SIZE / 32,
 350			));
 351		for _ in 0..pending_outbound_payments_count_compat {
 352			let session_priv = Readable::read(reader)?;
 353			let payment = PendingOutboundPayment::Legacy {
 354				session_privs: hash_set_from_iter([session_priv]),
 355			};
 356			if pending_outbound_payments_compat.insert(PaymentId(session_priv), payment).is_some() {
 357				return Err(DecodeError::InvalidValue);
 358			};
 359		}
 360
 361		// pending_outbound_payments_no_retry is for compatibility with 0.0.101 clients.
 362		let mut pending_outbound_payments_no_retry: Option<HashMap<PaymentId, HashSet<[u8; 32]>>> =
 363			None;
 364		let mut pending_outbound_payments = None;
 365		let mut pending_intercepted_htlcs: Option<HashMap<InterceptId, PendingAddHTLCInfo>> =
 366			Some(new_hash_map());
 367		let mut received_network_pubkey: Option<PublicKey> = None;
 368		let mut fake_scid_rand_bytes: Option<[u8; 32]> = None;
 369		let mut probing_cookie_secret: Option<[u8; 32]> = None;
 370		let mut claimable_htlc_purposes = None;
 371		let mut claimable_htlc_onion_fields = None;
 372		let mut pending_claiming_payments = Some(new_hash_map());
 373		let mut monitor_update_blocked_actions_per_peer: Option<Vec<(_, BTreeMap<_, Vec<_>>)>> =
 374			Some(Vec::new());
 375		let mut events_override = None;
 376		let mut in_flight_monitor_updates: Option<
 377			HashMap<(PublicKey, OutPoint), Vec<ChannelMonitorUpdate>>,
 378		> = None;
 379		let mut decode_update_add_htlcs: Option<HashMap<u64, Vec<msgs::UpdateAddHTLC>>> = None;
 380		read_tlv_fields!(reader, {
 381			(1, pending_outbound_payments_no_retry, option),
 382			(2, pending_intercepted_htlcs, option),
 383			(3, pending_outbound_payments, option),
 384			(4, pending_claiming_payments, option),
 385			(5, received_network_pubkey, option),
 386			(6, monitor_update_blocked_actions_per_peer, option),
 387			(7, fake_scid_rand_bytes, option),
 388			(8, events_override, option),
 389			(9, claimable_htlc_purposes, optional_vec),
 390			(10, in_flight_monitor_updates, option),
 391			(11, probing_cookie_secret, option),
 392			(13, claimable_htlc_onion_fields, optional_vec),
 393			(14, decode_update_add_htlcs, option),
 394		});
 395		let mut decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map());
 396		if fake_scid_rand_bytes.is_none() {
 397			fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes());
 398		}
 399
 400		if probing_cookie_secret.is_none() {
 401			probing_cookie_secret = Some(args.entropy_source.get_secure_random_bytes());
 402		}
 403
 404		if let Some(events) = events_override {
 405			pending_events_read = events;
 406		}
 407
 408		if !channel_closures.is_empty() {
 409			pending_events_read.append(&mut channel_closures);
 410		}
 411
 412		if pending_outbound_payments.is_none() && pending_outbound_payments_no_retry.is_none() {
 413			pending_outbound_payments = Some(pending_outbound_payments_compat);
 414		} else if pending_outbound_payments.is_none() {
 415			let mut outbounds = new_hash_map();
 416			for (id, session_privs) in pending_outbound_payments_no_retry.unwrap().drain() {
 417				outbounds.insert(id, PendingOutboundPayment::Legacy { session_privs });
 418			}
 419			pending_outbound_payments = Some(outbounds);
 420		}
 421		let pending_outbounds = OutboundPayments {
 422			pending_outbound_payments: Mutex::new(pending_outbound_payments.unwrap()),
 423			retry_lock: Mutex::new(()),
 424			color_source: args.color_source.clone(),
 425		};
 426
 427		// We have to replay (or skip, if they were completed after we wrote the `ChannelManager`)
 428		// each `ChannelMonitorUpdate` in `in_flight_monitor_updates`. After doing so, we have to
 429		// check that each channel we have isn't newer than the latest `ChannelMonitorUpdate`(s) we
 430		// replayed, and for each monitor update we have to replay we have to ensure there's a
 431		// `ChannelMonitor` for it.
 432		//
 433		// In order to do so we first walk all of our live channels (so that we can check their
 434		// state immediately after doing the update replays, when we have the `update_id`s
 435		// available) and then walk any remaining in-flight updates.
 436		//
 437		// Because the actual handling of the in-flight updates is the same, it's macro'ized here:
 438		let mut pending_background_events = Vec::new();
 439		macro_rules! handle_in_flight_updates {
 440			($counterparty_node_id: expr, $chan_in_flight_upds: expr, $funding_txo: expr,
 441			 $monitor: expr, $peer_state: expr, $logger: expr, $channel_info_log: expr
 442			) => {{
 443				let mut max_in_flight_update_id = 0;
 444				$chan_in_flight_upds.retain(|upd| upd.update_id > $monitor.get_latest_update_id());
 445				for update in $chan_in_flight_upds.iter() {
 446					log_trace!(
 447						$logger,
 448						"Replaying ChannelMonitorUpdate {} for {}channel {}",
 449						update.update_id,
 450						$channel_info_log,
 451						&$monitor.channel_id()
 452					);
 453					max_in_flight_update_id = cmp::max(max_in_flight_update_id, update.update_id);
 454					pending_background_events.push(
 455						BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
 456							counterparty_node_id: $counterparty_node_id,
 457							funding_txo: $funding_txo,
 458							channel_id: $monitor.channel_id(),
 459							update: update.clone(),
 460						},
 461					);
 462				}
 463				if $chan_in_flight_upds.is_empty() {
 464					// We had some updates to apply, but it turns out they had completed before we
 465					// were serialized, we just weren't notified of that. Thus, we may have to run
 466					// the completion actions for any monitor updates, but otherwise are done.
 467					pending_background_events.push(BackgroundEvent::MonitorUpdatesComplete {
 468						counterparty_node_id: $counterparty_node_id,
 469						channel_id: $monitor.channel_id(),
 470					});
 471				}
 472				if $peer_state
 473					.in_flight_monitor_updates
 474					.insert($funding_txo, $chan_in_flight_upds)
 475					.is_some()
 476				{
 477					log_error!(
 478						$logger,
 479						"Duplicate in-flight monitor update set for the same channel!"
 480					);
 481					return Err(DecodeError::InvalidValue);
 482				}
 483				max_in_flight_update_id
 484			}};
 485		}
 486
 487		for (counterparty_id, peer_state_mtx) in per_peer_state.iter_mut() {
 488			let mut peer_state_lock = peer_state_mtx.lock().unwrap();
 489			let peer_state = &mut *peer_state_lock;
 490			for phase in peer_state.channel_by_id.values() {
 491				if let ChannelPhase::Funded(chan) = phase {
 492					let logger = WithChannelContext::from(&args.logger, &chan.context);
 493
 494					// Channels that were persisted have to be funded, otherwise they should have been
 495					// discarded.
 496					let funding_txo =
 497						chan.context.get_funding_txo().ok_or(DecodeError::InvalidValue)?;
 498					let monitor = args
 499						.channel_monitors
 500						.get(&funding_txo)
 501						.expect("We already checked for monitor presence when loading channels");
 502					let mut max_in_flight_update_id = monitor.get_latest_update_id();
 503					if let Some(in_flight_upds) = &mut in_flight_monitor_updates {
 504						if let Some(mut chan_in_flight_upds) =
 505							in_flight_upds.remove(&(*counterparty_id, funding_txo))
 506						{
 507							max_in_flight_update_id = cmp::max(
 508								max_in_flight_update_id,
 509								handle_in_flight_updates!(
 510									*counterparty_id,
 511									chan_in_flight_upds,
 512									funding_txo,
 513									monitor,
 514									peer_state,
 515									logger,
 516									""
 517								),
 518							);
 519						}
 520					}
 521					if chan.get_latest_unblocked_monitor_update_id() > max_in_flight_update_id {
 522						// If the channel is ahead of the monitor, return DangerousValue:
 523						log_error!(logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!");
 524						log_error!(logger, " The ChannelMonitor for channel {} is at update_id {} with update_id through {} in-flight",
 525							chan.context.channel_id(), monitor.get_latest_update_id(), max_in_flight_update_id);
 526						log_error!(
 527							logger,
 528							" but the ChannelManager is at update_id {}.",
 529							chan.get_latest_unblocked_monitor_update_id()
 530						);
 531						log_error!(logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,");
 532						log_error!(logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!");
 533						log_error!(logger, " Without the latest ChannelMonitor we cannot continue without risking funds.");
 534						log_error!(logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning");
 535						return Err(DecodeError::DangerousValue);
 536					}
 537				} else {
 538					// We shouldn't have persisted (or read) any unfunded channel types so none should have been
 539					// created in this `channel_by_id` map.
 540					debug_assert!(false);
 541					return Err(DecodeError::InvalidValue);
 542				}
 543			}
 544		}
 545
 546		if let Some(in_flight_upds) = in_flight_monitor_updates {
 547			for ((counterparty_id, funding_txo), mut chan_in_flight_updates) in in_flight_upds {
 548				let channel_id = funding_txo_to_channel_id.get(&funding_txo).copied();
 549				let logger = WithContext::from(&args.logger, Some(counterparty_id), channel_id);
 550				if let Some(monitor) = args.channel_monitors.get(&funding_txo) {
 551					// Now that we've removed all the in-flight monitor updates for channels that are
 552					// still open, we need to replay any monitor updates that are for closed channels,
 553					// creating the neccessary peer_state entries as we go.
 554					let peer_state_mutex = per_peer_state
 555						.entry(counterparty_id)
 556						.or_insert_with(|| Mutex::new(peer_state_from_chans(new_hash_map())));
 557					let mut peer_state = peer_state_mutex.lock().unwrap();
 558					handle_in_flight_updates!(
 559						counterparty_id,
 560						chan_in_flight_updates,
 561						funding_txo,
 562						monitor,
 563						peer_state,
 564						logger,
 565						"closed "
 566					);
 567				} else {
 568					log_error!(logger, "A ChannelMonitor is missing even though we have in-flight updates for it! This indicates a potentially-critical violation of the chain::Watch API!");
 569					log_error!(
 570						logger,
 571						" The ChannelMonitor for channel {} is missing.",
 572						if let Some(channel_id) = channel_id {
 573							channel_id.to_string()
 574						} else {
 575							format!("with outpoint {}", funding_txo)
 576						}
 577					);
 578					log_error!(logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,");
 579					log_error!(logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!");
 580					log_error!(logger, " Without the latest ChannelMonitor we cannot continue without risking funds.");
 581					log_error!(logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning");
 582					log_error!(
 583						logger,
 584						" Pending in-flight updates are: {:?}",
 585						chan_in_flight_updates
 586					);
 587					return Err(DecodeError::InvalidValue);
 588				}
 589			}
 590		}
 591
 592		// Note that we have to do the above replays before we push new monitor updates.
 593		pending_background_events.append(&mut close_background_events);
 594
 595		// If there's any preimages for forwarded HTLCs hanging around in ChannelMonitors we
 596		// should ensure we try them again on the inbound edge. We put them here and do so after we
 597		// have a fully-constructed `ChannelManager` at the end.
 598		let mut pending_claims_to_replay = Vec::new();
 599
 600		{
 601			// If we're tracking pending payments, ensure we haven't lost any by looking at the
 602			// ChannelMonitor data for any channels for which we do not have authorative state
 603			// (i.e. those for which we just force-closed above or we otherwise don't have a
 604			// corresponding `Channel` at all).
 605			// This avoids several edge-cases where we would otherwise "forget" about pending
 606			// payments which are still in-flight via their on-chain state.
 607			// We only rebuild the pending payments map if we were most recently serialized by
 608			// 0.0.102+
 609			for (_, monitor) in args.channel_monitors.iter() {
 610				let counterparty_opt = outpoint_to_peer.get(&monitor.get_funding_txo().0);
 611				if counterparty_opt.is_none() {
 612					let logger = WithChannelMonitor::from(&args.logger, monitor);
 613					for (htlc_source, (htlc, _)) in monitor.get_pending_or_resolved_outbound_htlcs()
 614					{
 615						if let HTLCSource::OutboundRoute {
 616							payment_id, session_priv, path, ..
 617						} = htlc_source
 618						{
 619							if path.hops.is_empty() {
 620								log_error!(logger, "Got an empty path for a pending payment");
 621								return Err(DecodeError::InvalidValue);
 622							}
 623
 624							let path_amt = path.final_value_msat();
 625							let mut session_priv_bytes = [0; 32];
 626							session_priv_bytes[..].copy_from_slice(&session_priv[..]);
 627							match pending_outbounds
 628								.pending_outbound_payments
 629								.lock()
 630								.unwrap()
 631								.entry(payment_id)
 632							{
 633								hash_map::Entry::Occupied(mut entry) => {
 634									let newly_added =
 635										entry.get_mut().insert(session_priv_bytes, &path);
 636									log_info!(logger, "{} a pending payment path for {} msat for session priv {} on an existing pending payment with payment hash {}",
 637										if newly_added { "Added" } else { "Had" }, path_amt, log_bytes!(session_priv_bytes), htlc.payment_hash);
 638								},
 639								hash_map::Entry::Vacant(entry) => {
 640									let path_fee = path.fee_msat();
 641									entry.insert(PendingOutboundPayment::Retryable {
 642										retry_strategy: None,
 643										attempts: PaymentAttempts::new(),
 644										payment_params: None,
 645										session_privs: hash_set_from_iter([session_priv_bytes]),
 646										payment_hash: htlc.payment_hash,
 647										payment_secret: None, // only used for retries, and we'll never retry on startup
 648										payment_metadata: None, // only used for retries, and we'll never retry on startup
 649										keysend_preimage: None, // only used for retries, and we'll never retry on startup
 650										custom_tlvs: Vec::new(), // only used for retries, and we'll never retry on startup
 651										pending_amt_msat: path_amt,
 652										pending_fee_msat: Some(path_fee),
 653										total_msat: path_amt,
 654										starting_block_height: best_block_height,
 655										remaining_max_total_routing_fee_msat: None, // only used for retries, and we'll never retry on startup
 656									});
 657									log_info!(logger, "Added a pending payment for {} msat with payment hash {} for path with session priv {}",
 658										path_amt, &htlc.payment_hash,  log_bytes!(session_priv_bytes));
 659								},
 660							}
 661						}
 662					}
 663					for (htlc_source, (htlc, preimage_opt)) in
 664						monitor.get_all_current_outbound_htlcs()
 665					{
 666						match htlc_source {
 667							HTLCSource::PreviousHopData(prev_hop_data) => {
 668								let pending_forward_matches_htlc = |info: &PendingAddHTLCInfo| {
 669									info.prev_funding_outpoint == prev_hop_data.outpoint
 670										&& info.prev_htlc_id == prev_hop_data.htlc_id
 671								};
 672								// The ChannelMonitor is now responsible for this HTLC's
 673								// failure/success and will let us know what its outcome is. If we
 674								// still have an entry for this HTLC in `forward_htlcs` or
 675								// `pending_intercepted_htlcs`, we were apparently not persisted after
 676								// the monitor was when forwarding the payment.
 677								decode_update_add_htlcs.retain(|scid, update_add_htlcs| {
 678									update_add_htlcs.retain(|update_add_htlc| {
 679										let matches = *scid == prev_hop_data.short_channel_id &&
 680											update_add_htlc.htlc_id == prev_hop_data.htlc_id;
 681										if matches {
 682											log_info!(logger, "Removing pending to-decode HTLC with hash {} as it was forwarded to the closed channel {}",
 683												&htlc.payment_hash, &monitor.channel_id());
 684										}
 685										!matches
 686									});
 687									!update_add_htlcs.is_empty()
 688								});
 689								forward_htlcs.retain(|_, forwards| {
 690									forwards.retain(|forward| {
 691										if let HTLCForwardInfo::AddHTLC(htlc_info) = forward {
 692											if pending_forward_matches_htlc(&htlc_info) {
 693												log_info!(logger, "Removing pending to-forward HTLC with hash {} as it was forwarded to the closed channel {}",
 694													&htlc.payment_hash, &monitor.channel_id());
 695												false
 696											} else { true }
 697										} else { true }
 698									});
 699									!forwards.is_empty()
 700								});
 701								pending_intercepted_htlcs.as_mut().unwrap().retain(|intercepted_id, htlc_info| {
 702									if pending_forward_matches_htlc(&htlc_info) {
 703										log_info!(logger, "Removing pending intercepted HTLC with hash {} as it was forwarded to the closed channel {}",
 704											&htlc.payment_hash, &monitor.channel_id());
 705										pending_events_read.retain(|(event, _)| {
 706											if let Event::HTLCIntercepted { intercept_id: ev_id, .. } = event {
 707												intercepted_id != ev_id
 708											} else { true }
 709										});
 710										false
 711									} else { true }
 712								});
 713							},
 714							HTLCSource::OutboundRoute {
 715								payment_id, session_priv, path, ..
 716							} => {
 717								if let Some(preimage) = preimage_opt {
 718									let pending_events = Mutex::new(pending_events_read);
 719									// Note that we set `from_onchain` to "false" here,
 720									// deliberately keeping the pending payment around forever.
 721									// Given it should only occur when we have a channel we're
 722									// force-closing for being stale that's okay.
 723									// The alternative would be to wipe the state when claiming,
 724									// generating a `PaymentPathSuccessful` event but regenerating
 725									// it and the `PaymentSent` on every restart until the
 726									// `ChannelMonitor` is removed.
 727									let compl_action =
 728										EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
 729											channel_funding_outpoint: monitor.get_funding_txo().0,
 730											channel_id: monitor.channel_id(),
 731											counterparty_node_id: path.hops[0].pubkey,
 732										};
 733									pending_outbounds.claim_htlc(
 734										payment_id,
 735										preimage,
 736										session_priv,
 737										path,
 738										false,
 739										compl_action,
 740										&pending_events,
 741										&&logger,
 742									);
 743									pending_events_read = pending_events.into_inner().unwrap();
 744								}
 745							},
 746						}
 747					}
 748				}
 749
 750				// Whether the downstream channel was closed or not, try to re-apply any payment
 751				// preimages from it which may be needed in upstream channels for forwarded
 752				// payments.
 753				let outbound_claimed_htlcs_iter = monitor
 754					.get_all_current_outbound_htlcs()
 755					.into_iter()
 756					.filter_map(|(htlc_source, (htlc, preimage_opt))| {
 757						if let HTLCSource::PreviousHopData(_) = htlc_source {
 758							if let Some(payment_preimage) = preimage_opt {
 759								Some((
 760									htlc_source,
 761									payment_preimage,
 762									htlc.amount_msat,
 763									htlc.amount_rgb,
 764									// Check if `counterparty_opt.is_none()` to see if the
 765									// downstream chan is closed (because we don't have a
 766									// channel_id -> peer map entry).
 767									counterparty_opt.is_none(),
 768									counterparty_opt
 769										.cloned()
 770										.or(monitor.get_counterparty_node_id()),
 771									monitor.get_funding_txo().0,
 772									monitor.channel_id(),
 773								))
 774							} else {
 775								None
 776							}
 777						} else {
 778							// If it was an outbound payment, we've handled it above - if a preimage
 779							// came in and we persisted the `ChannelManager` we either handled it and
 780							// are good to go or the channel force-closed - we don't have to handle the
 781							// channel still live case here.
 782							None
 783						}
 784					});
 785				for tuple in outbound_claimed_htlcs_iter {
 786					pending_claims_to_replay.push(tuple);
 787				}
 788			}
 789		}
 790
 791		if !forward_htlcs.is_empty()
 792			|| !decode_update_add_htlcs.is_empty()
 793			|| pending_outbounds.needs_abandon()
 794		{
 795			// If we have pending HTLCs to forward, assume we either dropped a
 796			// `PendingHTLCsForwardable` or the user received it but never processed it as they
 797			// shut down before the timer hit. Either way, set the time_forwardable to a small
 798			// constant as enough time has likely passed that we should simply handle the forwards
 799			// now, or at least after the user gets a chance to reconnect to our peers.
 800			pending_events_read.push_back((
 801				events::Event::PendingHTLCsForwardable { time_forwardable: Duration::from_secs(2) },
 802				None,
 803			));
 804		}
 805
 806		let inbound_pmt_key_material = args.node_signer.get_inbound_payment_key_material();
 807		let expanded_inbound_key = inbound_payment::ExpandedKey::new(&inbound_pmt_key_material);
 808
 809		let mut claimable_payments = hash_map_with_capacity(claimable_htlcs_list.len());
 810		if let Some(purposes) = claimable_htlc_purposes {
 811			if purposes.len() != claimable_htlcs_list.len() {
 812				return Err(DecodeError::InvalidValue);
 813			}
 814			if let Some(onion_fields) = claimable_htlc_onion_fields {
 815				if onion_fields.len() != claimable_htlcs_list.len() {
 816					return Err(DecodeError::InvalidValue);
 817				}
 818				for (purpose, (onion, (payment_hash, htlcs))) in purposes
 819					.into_iter()
 820					.zip(onion_fields.into_iter().zip(claimable_htlcs_list.into_iter()))
 821				{
 822					let existing_payment = claimable_payments.insert(
 823						payment_hash,
 824						ClaimablePayment { purpose, htlcs, onion_fields: onion },
 825					);
 826					if existing_payment.is_some() {
 827						return Err(DecodeError::InvalidValue);
 828					}
 829				}
 830			} else {
 831				for (purpose, (payment_hash, htlcs)) in
 832					purposes.into_iter().zip(claimable_htlcs_list.into_iter())
 833				{
 834					let existing_payment = claimable_payments.insert(
 835						payment_hash,
 836						ClaimablePayment { purpose, htlcs, onion_fields: None },
 837					);
 838					if existing_payment.is_some() {
 839						return Err(DecodeError::InvalidValue);
 840					}
 841				}
 842			}
 843		} else {
 844			// LDK versions prior to 0.0.107 did not write a `pending_htlc_purposes`, but do
 845			// include a `_legacy_hop_data` in the `OnionPayload`.
 846			for (payment_hash, htlcs) in claimable_htlcs_list.drain(..) {
 847				if htlcs.is_empty() {
 848					return Err(DecodeError::InvalidValue);
 849				}
 850				let purpose = match &htlcs[0].onion_payload {
 851					OnionPayload::Invoice { _legacy_hop_data } => {
 852						if let Some(hop_data) = _legacy_hop_data {
 853							events::PaymentPurpose::Bolt11InvoicePayment {
 854								payment_preimage: match pending_inbound_payments.get(&payment_hash)
 855								{
 856									Some(inbound_payment) => inbound_payment.payment_preimage,
 857									None => match inbound_payment::verify(
 858										payment_hash,
 859										&hop_data,
 860										0,
 861										&expanded_inbound_key,
 862										&args.logger,
 863									) {
 864										Ok((payment_preimage, _)) => payment_preimage,
 865										Err(()) => {
 866											log_error!(args.logger, "Failed to read claimable payment data for HTLC with payment hash {} - was not a pending inbound payment and didn't match our payment key", &payment_hash);
 867											return Err(DecodeError::InvalidValue);
 868										},
 869									},
 870								},
 871								payment_secret: hop_data.payment_secret,
 872							}
 873						} else {
 874							return Err(DecodeError::InvalidValue);
 875						}
 876					},
 877					OnionPayload::Spontaneous(payment_preimage) => {
 878						events::PaymentPurpose::SpontaneousPayment(*payment_preimage)
 879					},
 880				};
 881				claimable_payments
 882					.insert(payment_hash, ClaimablePayment { purpose, htlcs, onion_fields: None });
 883			}
 884		}
 885
 886		let mut secp_ctx = Secp256k1::new();
 887		secp_ctx.seeded_randomize(&args.entropy_source.get_secure_random_bytes());
 888
 889		let our_network_pubkey = match args.node_signer.get_node_id(Recipient::Node) {
 890			Ok(key) => key,
 891			Err(()) => return Err(DecodeError::InvalidValue),
 892		};
 893		if let Some(network_pubkey) = received_network_pubkey {
 894			if network_pubkey != our_network_pubkey {
 895				log_error!(
 896					args.logger,
 897					"Key that was generated does not match the existing key. left: {}, right: {}",
 898					network_pubkey,
 899					our_network_pubkey
 900				);
 901				return Err(DecodeError::InvalidValue);
 902			}
 903		}
 904
 905		let mut outbound_scid_aliases = new_hash_set();
 906		for (_peer_node_id, peer_state_mutex) in per_peer_state.iter_mut() {
 907			let mut peer_state_lock = peer_state_mutex.lock().unwrap();
 908			let peer_state = &mut *peer_state_lock;
 909			for (chan_id, phase) in peer_state.channel_by_id.iter_mut() {
 910				if let ChannelPhase::Funded(chan) = phase {
 911					let logger = WithChannelContext::from(&args.logger, &chan.context);
 912					if chan.context.outbound_scid_alias() == 0 {
 913						let mut outbound_scid_alias;
 914						loop {
 915							outbound_scid_alias = fake_scid::Namespace::OutboundAlias
 916								.get_fake_scid(
 917									best_block_height,
 918									&chain_hash,
 919									fake_scid_rand_bytes.as_ref().unwrap(),
 920									&args.entropy_source,
 921								);
 922							if outbound_scid_aliases.insert(outbound_scid_alias) {
 923								break;
 924							}
 925						}
 926						chan.context.set_outbound_scid_alias(outbound_scid_alias);
 927					} else if !outbound_scid_aliases.insert(chan.context.outbound_scid_alias()) {
 928						// Note that in rare cases its possible to hit this while reading an older
 929						// channel if we just happened to pick a colliding outbound alias above.
 930						log_error!(
 931							logger,
 932							"Got duplicate outbound SCID alias; {}",
 933							chan.context.outbound_scid_alias()
 934						);
 935						return Err(DecodeError::InvalidValue);
 936					}
 937					if chan.context.is_usable() {
 938						if short_to_chan_info
 939							.insert(
 940								chan.context.outbound_scid_alias(),
 941								(chan.context.get_counterparty_node_id(), *chan_id),
 942							)
 943							.is_some()
 944						{
 945							// Note that in rare cases its possible to hit this while reading an older
 946							// channel if we just happened to pick a colliding outbound alias above.
 947							log_error!(
 948								logger,
 949								"Got duplicate outbound SCID alias; {}",
 950								chan.context.outbound_scid_alias()
 951							);
 952							return Err(DecodeError::InvalidValue);
 953						}
 954					}
 955				} else {
 956					// We shouldn't have persisted (or read) any unfunded channel types so none should have been
 957					// created in this `channel_by_id` map.
 958					debug_assert!(false);
 959					return Err(DecodeError::InvalidValue);
 960				}
 961			}
 962		}
 963
 964		let bounded_fee_estimator = LowerBoundedFeeEstimator::new(args.fee_estimator);
 965
 966		for (_, monitor) in args.channel_monitors.iter() {
 967			for (payment_hash, payment_preimage) in monitor.get_stored_preimages() {
 968				if let Some(payment) = claimable_payments.remove(&payment_hash) {
 969					log_info!(args.logger, "Re-claiming HTLCs with payment hash {} as we've released the preimage to a ChannelMonitor!", &payment_hash);
 970					let mut claimable_amt_msat = 0;
 971					let mut receiver_node_id = Some(our_network_pubkey);
 972					let phantom_shared_secret = payment.htlcs[0].prev_hop.phantom_shared_secret;
 973					if phantom_shared_secret.is_some() {
 974						let phantom_pubkey = args
 975							.node_signer
 976							.get_node_id(Recipient::PhantomNode)
 977							.expect("Failed to get node_id for phantom node recipient");
 978						receiver_node_id = Some(phantom_pubkey)
 979					}
 980					for claimable_htlc in &payment.htlcs {
 981						claimable_amt_msat += claimable_htlc.value;
 982
 983						// Add a holding-cell claim of the payment to the Channel, which should be
 984						// applied ~immediately on peer reconnection. Because it won't generate a
 985						// new commitment transaction we can just provide the payment preimage to
 986						// the corresponding ChannelMonitor and nothing else.
 987						//
 988						// We do so directly instead of via the normal ChannelMonitor update
 989						// procedure as the ChainMonitor hasn't yet been initialized, implying
 990						// we're not allowed to call it directly yet. Further, we do the update
 991						// without incrementing the ChannelMonitor update ID as there isn't any
 992						// reason to.
 993						// If we were to generate a new ChannelMonitor update ID here and then
 994						// crash before the user finishes block connect we'd end up force-closing
 995						// this channel as well. On the flip side, there's no harm in restarting
 996						// without the new monitor persisted - we'll end up right back here on
 997						// restart.
 998						let previous_channel_id = claimable_htlc.prev_hop.channel_id;
 999						if let Some(peer_node_id) =
1000							outpoint_to_peer.get(&claimable_htlc.prev_hop.outpoint)
1001						{
1002							let peer_state_mutex = per_peer_state.get(peer_node_id).unwrap();
1003							let mut peer_state_lock = peer_state_mutex.lock().unwrap();
1004							let peer_state = &mut *peer_state_lock;
1005							if let Some(ChannelPhase::Funded(channel)) =
1006								peer_state.channel_by_id.get_mut(&previous_channel_id)
1007							{
1008								let logger =
1009									WithChannelContext::from(&args.logger, &channel.context);
1010								channel.claim_htlc_while_disconnected_dropping_mon_update(
1011									claimable_htlc.prev_hop.htlc_id,
1012									payment_preimage,
1013									&&logger,
1014								);
1015							}
1016						}
1017						if let Some(previous_hop_monitor) =
1018							args.channel_monitors.get(&claimable_htlc.prev_hop.outpoint)
1019						{
1020							previous_hop_monitor.provide_payment_preimage(
1021								&payment_hash,
1022								&payment_preimage,
1023								&args.tx_broadcaster,
1024								&bounded_fee_estimator,
1025								&args.logger,
1026							);
1027						}
1028					}
1029					pending_events_read.push_back((
1030						events::Event::PaymentClaimed {
1031							receiver_node_id,
1032							payment_hash,
1033							purpose: payment.purpose,
1034							amount_msat: claimable_amt_msat,
1035							htlcs: payment.htlcs.iter().map(events::ClaimedHTLC::from).collect(),
1036							sender_intended_total_msat: payment
1037								.htlcs
1038								.first()
1039								.map(|htlc| htlc.total_msat),
1040						},
1041						None,
1042					));
1043				}
1044			}
1045		}
1046
1047		for (node_id, monitor_update_blocked_actions) in
1048			monitor_update_blocked_actions_per_peer.unwrap()
1049		{
1050			if let Some(peer_state) = per_peer_state.get(&node_id) {
1051				for (channel_id, actions) in monitor_update_blocked_actions.iter() {
1052					let logger = WithContext::from(&args.logger, Some(node_id), Some(*channel_id));
1053					for action in actions.iter() {
1054						if let MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
1055							downstream_counterparty_and_funding_outpoint:
1056								Some((
1057									blocked_node_id,
1058									_blocked_channel_outpoint,
1059									blocked_channel_id,
1060									blocking_action,
1061								)),
1062							..
1063						} = action
1064						{
1065							if let Some(blocked_peer_state) = per_peer_state.get(blocked_node_id) {
1066								log_trace!(logger,
1067									"Holding the next revoke_and_ack from {} until the preimage is durably persisted in the inbound edge's ChannelMonitor",
1068									blocked_channel_id);
1069								blocked_peer_state
1070									.lock()
1071									.unwrap()
1072									.actions_blocking_raa_monitor_updates
1073									.entry(*blocked_channel_id)
1074									.or_insert_with(Vec::new)
1075									.push(blocking_action.clone());
1076							} else {
1077								// If the channel we were blocking has closed, we don't need to
1078								// worry about it - the blocked monitor update should never have
1079								// been released from the `Channel` object so it can't have
1080								// completed, and if the channel closed there's no reason to bother
1081								// anymore.
1082							}
1083						}
1084						if let MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
1085							..
1086						} = action
1087						{
1088							debug_assert!(false, "Non-event-generating channel freeing should not appear in our queue");
1089						}
1090					}
1091				}
1092				peer_state.lock().unwrap().monitor_update_blocked_actions =
1093					monitor_update_blocked_actions;
1094			} else {
1095				log_error!(
1096					WithContext::from(&args.logger, Some(node_id), None),
1097					"Got blocked actions without a per-peer-state for {}",
1098					node_id
1099				);
1100				return Err(DecodeError::InvalidValue);
1101			}
1102		}
1103
1104		let channel_manager = ChannelManager {
1105			chain_hash,
1106			fee_estimator: bounded_fee_estimator,
1107			chain_monitor: args.chain_monitor,
1108			tx_broadcaster: args.tx_broadcaster,
1109			router: args.router,
1110
1111			best_block: RwLock::new(BestBlock::new(best_block_hash, best_block_height)),
1112
1113			inbound_payment_key: expanded_inbound_key,
1114			pending_inbound_payments: Mutex::new(pending_inbound_payments),
1115			pending_outbound_payments: pending_outbounds,
1116			pending_intercepted_htlcs: Mutex::new(pending_intercepted_htlcs.unwrap()),
1117
1118			forward_htlcs: Mutex::new(forward_htlcs),
1119			decode_update_add_htlcs: Mutex::new(decode_update_add_htlcs),
1120			claimable_payments: Mutex::new(ClaimablePayments {
1121				claimable_payments,
1122				pending_claiming_payments: pending_claiming_payments.unwrap(),
1123			}),
1124			outbound_scid_aliases: Mutex::new(outbound_scid_aliases),
1125			outpoint_to_peer: Mutex::new(outpoint_to_peer),
1126			short_to_chan_info: FairRwLock::new(short_to_chan_info),
1127			fake_scid_rand_bytes: fake_scid_rand_bytes.unwrap(),
1128
1129			probing_cookie_secret: probing_cookie_secret.unwrap(),
1130
1131			our_network_pubkey,
1132			secp_ctx,
1133
1134			highest_seen_timestamp: AtomicUsize::new(highest_seen_timestamp as usize),
1135
1136			per_peer_state: FairRwLock::new(per_peer_state),
1137
1138			pending_events: Mutex::new(pending_events_read),
1139			pending_events_processor: AtomicBool::new(false),
1140			pending_background_events: Mutex::new(pending_background_events),
1141			total_consistency_lock: RwLock::new(()),
1142			background_events_processed_since_startup: AtomicBool::new(false),
1143
1144			event_persist_notifier: Notifier::new(),
1145			needs_persist_flag: AtomicBool::new(false),
1146
1147			funding_batch_states: Mutex::new(BTreeMap::new()),
1148
1149			pending_offers_messages: Mutex::new(Vec::new()),
1150
1151			pending_broadcast_messages: Mutex::new(Vec::new()),
1152
1153			entropy_source: args.entropy_source,
1154			node_signer: args.node_signer,
1155			signer_provider: args.signer_provider,
1156
1157			logger: args.logger,
1158			default_configuration: args.default_config,
1159			color_source: args.color_source,
1160		};
1161
1162		for htlc_source in failed_htlcs.drain(..) {
1163			let (source, payment_hash, counterparty_node_id, channel_id) = htlc_source;
1164			let receiver =
1165				HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id };
1166			let reason = HTLCFailReason::from_failure_code(0x4000 | 8);
1167			channel_manager.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver);
1168		}
1169
1170		for (
1171			source,
1172			preimage,
1173			downstream_value,
1174			downstream_rgb,
1175			downstream_closed,
1176			downstream_node_id,
1177			downstream_funding,
1178			downstream_channel_id,
1179		) in pending_claims_to_replay
1180		{
1181			// We use `downstream_closed` in place of `from_onchain` here just as a guess - we
1182			// don't remember in the `ChannelMonitor` where we got a preimage from, but if the
1183			// channel is closed we just assume that it probably came from an on-chain claim.
1184			channel_manager.claim_funds_internal(
1185				source,
1186				preimage,
1187				Some(downstream_value),
1188				downstream_rgb,
1189				None,
1190				downstream_closed,
1191				true,
1192				downstream_node_id,
1193				downstream_funding,
1194				downstream_channel_id,
1195				None,
1196			);
1197		}
1198
1199		//TODO: Broadcast channel update for closed channels, but only after we've made a
1200		//connection or two.
1201
1202		Ok((best_block_hash.clone(), channel_manager))
1203	}
1204}

教学

下面我告诉你怎么读。

首先,我根本不读!我直接选中 Readable::read 然后按下 Ctrl+Shift+L,这在我的编辑器中表示选中所有的相同文本。然后我进而选中整行。

考虑到还有一些反序列化位于循环语句中,再手动浏览一遍将其补上。得到下面的代码:

 1		let chain_hash: ChainHash = Readable::read(reader)?;
 2		let best_block_height: u32 = Readable::read(reader)?;
 3		let best_block_hash: BlockHash = Readable::read(reader)?;
 4		let channel_count: u64 = Readable::read(reader)?;
 5		let forward_htlcs_count: u64 = Readable::read(reader)?;
 6		for _ in 0..forward_htlcs_count {
 7			let short_channel_id = Readable::read(reader)?;
 8			let pending_forwards_count: u64 = Readable::read(reader)?;
 9				pending_forwards.push(Readable::read(reader)?);
10		}
11		let claimable_htlcs_count: u64 = Readable::read(reader)?;
12		for _ in 0..claimable_htlcs_count {
13			let payment_hash = Readable::read(reader)?;
14			let previous_hops_len: u64 = Readable::read(reader)?;
15		}
16		let peer_count: u64 = Readable::read(reader)?;
17		for _ in 0..peer_count {
18			let peer_pubkey = Readable::read(reader)?;
19			peer_state.latest_features = Readable::read(reader)?;
20		}
21		let event_count: u64 = Readable::read(reader)?;
22		for _ in 0..event_count {
23			match MaybeReadable::read(reader)? {
24		}
25		let background_event_count: u64 = Readable::read(reader)?;
26		for _ in 0..background_event_count {
27					let _: OutPoint = Readable::read(reader)?;
28					let _: ChannelMonitorUpdate = Readable::read(reader)?;
29		}
30		let _last_node_announcement_serial: u32 = Readable::read(reader)?; // Only used < 0.0.111
31		let highest_seen_timestamp: u32 = Readable::read(reader)?;
32		let pending_inbound_payment_count: u64 = Readable::read(reader)?;
33		for _ in 0..pending_inbound_payment_count {
34				.insert(Readable::read(reader)?, Readable::read(reader)?)
35		}
36		let pending_outbound_payments_count_compat: u64 = Readable::read(reader)?;
37		for _ in 0..pending_outbound_payments_count_compat {
38			let session_priv = Readable::read(reader)?;
39		}

然后,这样做可能有漏网之鱼,再搜索 reader 找到遗漏的:

 1		let _ver = read_ver_prefix!(reader, SERIALIZATION_VERSION);
 2		for _ in 0..channel_count {
 3			let mut channel: Channel<SP> = Channel::read(
 4				reader,
 5				(
 6					&args.entropy_source,
 7					&args.signer_provider,
 8					best_block_height,
 9					&provided_channel_type_features(&args.default_config),
10					args.color_source.clone(),
11				),
12			)?;
13		}
14		for _ in 0..claimable_htlcs_count {
15			for _ in 0..previous_hops_len {
16				previous_hops.push(<ClaimableHTLC as Readable>::read(reader)?);
17			}
18		}
19		for _ in 0..background_event_count {
20			match <u8 as Readable>::read(reader)? {
21				0 => {
22					// LDK versions prior to 0.0.116 wrote pending `MonitorUpdateRegeneratedOnStartup`s here,
23					// however we really don't (and never did) need them - we regenerate all
24					// on-startup monitor updates.
25				},
26				_ => return Err(DecodeError::InvalidValue),
27			}
28		}
29		read_tlv_fields!(reader, {
30			(1, pending_outbound_payments_no_retry, option),
31			(2, pending_intercepted_htlcs, option),
32			(3, pending_outbound_payments, option),
33			(4, pending_claiming_payments, option),
34			(5, received_network_pubkey, option),
35			(6, monitor_update_blocked_actions_per_peer, option),
36			(7, fake_scid_rand_bytes, option),
37			(8, events_override, option),
38			(9, claimable_htlc_purposes, optional_vec),
39			(10, in_flight_monitor_updates, option),
40			(11, probing_cookie_secret, option),
41			(13, claimable_htlc_onion_fields, optional_vec),
42			(14, decode_update_add_htlcs, option),
43		});

好的,齐活了。

最终成果

序号 字段 类型 描述
1 chain_hash ChainHash 区块链网络的标识符。
2 best_block_height u32 最优(最近)区块的高度。
3 best_block_hash BlockHash 最优区块的哈希值。
4 channel_count u64 管理的通道总数。
5 forward_htlcs_count u64 转发 HTLC 的数量。
6 转发 HTLCs
   short_channel_id ShortChannelId 短通道的标识符。
   pending_forwards_count u64 当前通道中挂起转发 HTLC 的数量。
   pending_forwards PendingForward (列表) 挂起转发 HTLC 的列表。
7 claimable_htlcs_count u64 可领取的 HTLC 数量。
8 可领取的 HTLCs
   payment_hash PaymentHash 与 HTLC 相关的支付哈希。
   previous_hops_len u64 HTLC 路径中的前一跳数量。
9 peer_count u64 连接的节点总数。
10 节点
   peer_pubkey PubKey 节点的公钥。
   peer_state.latest_features LatestFeatures 节点支持的最新功能集。
11 event_count u64 记录的事件数量。
12 事件 使用 MaybeReadable 反序列化的事件列表。
13 background_event_count u64 后台事件的数量。
14 后台事件
   OutPoint OutPoint 引用特定 UTXO 的输入。
   ChannelMonitorUpdate ChannelMonitorUpdate 通道监控的更新信息。
15 _last_node_announcement_serial u32 (已弃用)最后一个节点公告的序列号(仅适用于版本 < 0.0.111)。
16 highest_seen_timestamp u32 最近一次事件的时间戳。
17 pending_inbound_payment_count u64 挂起的入站支付数量。
18 挂起的入站支付
   PaymentId PaymentId 入站支付的标识符。
   PaymentData PaymentData 与挂起入站支付相关的数据。
19 pending_outbound_payments_count_compat u64 挂起的出站支付数量(兼容层)。
20 挂起的出站支付(兼容层)
   session_priv SessionPriv 出站支付的会话私有数据。
21 _ver u8 (版本前缀) 序列化的版本前缀(与 SERIALIZATION_VERSION 验证)。
22 通道
   Channel<SP> Channel (列表) 通道状态的列表,通过上下文参数反序列化。
23 可领取 HTLC 的前一跳
   ClaimableHTLC ClaimableHTLC (列表) 每个可领取 HTLC 的前一跳详细信息。
24 后台事件验证
   u8 u8 验证后台事件,仅接受 0(LDK < 0.0.116)。
25 TLV 字段
   1: pending_outbound_payments_no_retry 可选字段 无重试的挂起出站支付。
   2: pending_intercepted_htlcs 可选字段 挂起的拦截 HTLCs。
   3: pending_outbound_payments 可选字段 可重试的挂起出站支付。
   4: pending_claiming_payments 可选字段 可领取的挂起支付。
   5: received_network_pubkey 可选字段 从节点接收到的网络公钥。
   6: monitor_update_blocked_actions_per_peer 可选字段 每个节点阻止的监控更新操作。
   7: fake_scid_rand_bytes 可选字段 假短通道 ID (SCID) 的随机字节。
   8: events_override 可选字段 事件处理的重写选项。
   9: claimable_htlc_purposes 可选向量字段 与可领取 HTLC 相关的用途。
   10: in_flight_monitor_updates 可选字段 当前正在进行的监控更新。
   11: probing_cookie_secret 可选字段 用于探测 cookie 的密钥。
   13: claimable_htlc_onion_fields 可选向量字段 可领取 HTLC 的洋葱包字段。
   14: decode_update_add_htlcs 可选字段 添加 HTLC 的更新解码。

总结

效率!效率!还是效率!

当阅读这种业务复杂、逻辑简单但又特别长的代码时,我们核心点在于不要试图死磕,而是着力于函数各个输入具体被怎样操作,产生怎样的输出或 side effect。这样可以有效提高阅读效率,达成阅读的目标。在这个过程中,你可以自行探索各种技巧(欢迎 PR 补充!)。