1
// Copyright 2019-2022 PureStake Inc.
2
// This file is part of Moonbeam.
3

            
4
// Moonbeam is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Moonbeam is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Moonbeam.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
//! This module assembles the Moonbeam service components, executes them, and manages communication
18
//! between them. This is the backbone of the client-side node implementation.
19
//!
20
//! This module can assemble:
21
//! PartialComponents: For maintence tasks without a complete node (eg import/export blocks, purge)
22
//! Full Service: A complete parachain node including the pool, rpc, network, embedded relay chain
23
//! Dev Service: A leaner service without the relay chain backing.
24

            
25
pub mod rpc;
26

            
27
use cumulus_client_cli::CollatorOptions;
28
use cumulus_client_collator::service::CollatorService;
29
use cumulus_client_consensus_common::ParachainBlockImport as TParachainBlockImport;
30
use cumulus_client_consensus_proposer::Proposer;
31
use cumulus_client_parachain_inherent::{MockValidationDataInherentDataProvider, MockXcmConfig};
32
use cumulus_client_service::{
33
	prepare_node_config, start_relay_chain_tasks, CollatorSybilResistance, DARecoveryProfile,
34
	ParachainHostFunctions, StartRelayChainTasksParams,
35
};
36
use cumulus_primitives_core::relay_chain::CollatorPair;
37
use cumulus_primitives_core::ParaId;
38
use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain;
39
use cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface, RelayChainResult};
40
use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node_with_rpc;
41
use fc_consensus::FrontierBlockImport as TFrontierBlockImport;
42
use fc_db::DatabaseSource;
43
use fc_rpc::StorageOverrideHandler;
44
use fc_rpc_core::types::{FeeHistoryCache, FilterPool};
45
use futures::{FutureExt, StreamExt};
46
use maplit::hashmap;
47
#[cfg(feature = "moonbase-native")]
48
pub use moonbase_runtime;
49
use moonbeam_cli_opt::{EthApi as EthApiCmd, FrontierBackendConfig, RpcConfig};
50
#[cfg(feature = "moonbeam-native")]
51
pub use moonbeam_runtime;
52
use moonbeam_vrf::VrfDigestsProvider;
53
#[cfg(feature = "moonriver-native")]
54
pub use moonriver_runtime;
55
use nimbus_consensus::NimbusManualSealConsensusDataProvider;
56
use nimbus_primitives::{DigestsProvider, NimbusId};
57
use sc_client_api::{
58
	backend::{AuxStore, Backend, StateBackend, StorageProvider},
59
	ExecutorProvider,
60
};
61
use sc_consensus::ImportQueue;
62
use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
63
use sc_network::{config::FullNetworkConfiguration, NetworkBackend, NetworkBlock};
64
use sc_service::config::PrometheusConfig;
65
use sc_service::{
66
	error::Error as ServiceError, ChainSpec, Configuration, PartialComponents, TFullBackend,
67
	TFullClient, TaskManager,
68
};
69
use sc_telemetry::{Telemetry, TelemetryHandle, TelemetryWorker, TelemetryWorkerHandle};
70
use sc_transaction_pool_api::OffchainTransactionPoolFactory;
71
use sp_api::{ConstructRuntimeApi, ProvideRuntimeApi};
72
use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
73
use sp_consensus::SyncOracle;
74
use sp_core::{ByteArray, Encode, H256};
75
use sp_keystore::{Keystore, KeystorePtr};
76
use std::str::FromStr;
77
use std::sync::Arc;
78
use std::{collections::BTreeMap, path::Path, sync::Mutex, time::Duration};
79
use substrate_prometheus_endpoint::Registry;
80

            
81
pub use client::*;
82
pub mod chain_spec;
83
mod client;
84

            
85
type FullClient<RuntimeApi> = TFullClient<Block, RuntimeApi, WasmExecutor<HostFunctions>>;
86
type FullBackend = TFullBackend<Block>;
87

            
88
type MaybeSelectChain = Option<sc_consensus::LongestChain<FullBackend, Block>>;
89
type FrontierBlockImport<RuntimeApi> =
90
	TFrontierBlockImport<Block, Arc<FullClient<RuntimeApi>>, FullClient<RuntimeApi>>;
91
type ParachainBlockImport<RuntimeApi> =
92
	TParachainBlockImport<Block, FrontierBlockImport<RuntimeApi>, FullBackend>;
93
type PartialComponentsResult<RuntimeApi> = Result<
94
	PartialComponents<
95
		FullClient<RuntimeApi>,
96
		FullBackend,
97
		MaybeSelectChain,
98
		sc_consensus::DefaultImportQueue<Block>,
99
		sc_transaction_pool::FullPool<Block, FullClient<RuntimeApi>>,
100
		(
101
			BlockImportPipeline<FrontierBlockImport<RuntimeApi>, ParachainBlockImport<RuntimeApi>>,
102
			Option<FilterPool>,
103
			Option<Telemetry>,
104
			Option<TelemetryWorkerHandle>,
105
			Arc<fc_db::Backend<Block, FullClient<RuntimeApi>>>,
106
			FeeHistoryCache,
107
		),
108
	>,
109
	ServiceError,
110
>;
111

            
112
#[cfg(feature = "runtime-benchmarks")]
113
pub type HostFunctions = (
114
	frame_benchmarking::benchmarking::HostFunctions,
115
	ParachainHostFunctions,
116
	moonbeam_primitives_ext::moonbeam_ext::HostFunctions,
117
);
118
#[cfg(not(feature = "runtime-benchmarks"))]
119
pub type HostFunctions = (
120
	ParachainHostFunctions,
121
	moonbeam_primitives_ext::moonbeam_ext::HostFunctions,
122
);
123

            
124
/// Block Import Pipeline used.
125
pub enum BlockImportPipeline<T, E> {
126
	/// Used in dev mode to import new blocks as best blocks.
127
	Dev(T),
128
	/// Used in parachain mode.
129
	Parachain(E),
130
}
131

            
132
/// A trait that must be implemented by all moon* runtimes executors.
133
///
134
/// This feature allows, for instance, to customize the client extensions according to the type
135
/// of network.
136
/// For the moment, this feature is only used to specify the first block compatible with
137
/// ed25519-zebra, but it could be used for other things in the future.
138
pub trait ClientCustomizations {
139
	/// The host function ed25519_verify has changed its behavior in the substrate history,
140
	/// because of the change from lib ed25519-dalek to lib ed25519-zebra.
141
	/// Some networks may have old blocks that are not compatible with ed25519-zebra,
142
	/// for these networks this function should return the 1st block compatible with the new lib.
143
	/// If this function returns None (default behavior), it implies that all blocks are compatible
144
	/// with the new lib (ed25519-zebra).
145
	fn first_block_number_compatible_with_ed25519_zebra() -> Option<u32> {
146
		None
147
	}
148
}
149

            
150
#[cfg(feature = "moonbeam-native")]
151
pub struct MoonbeamCustomizations;
152
#[cfg(feature = "moonbeam-native")]
153
impl ClientCustomizations for MoonbeamCustomizations {
154
	fn first_block_number_compatible_with_ed25519_zebra() -> Option<u32> {
155
		Some(2_000_000)
156
	}
157
}
158

            
159
#[cfg(feature = "moonriver-native")]
160
pub struct MoonriverCustomizations;
161
#[cfg(feature = "moonriver-native")]
162
impl ClientCustomizations for MoonriverCustomizations {
163
	fn first_block_number_compatible_with_ed25519_zebra() -> Option<u32> {
164
		Some(3_000_000)
165
	}
166
}
167

            
168
#[cfg(feature = "moonbase-native")]
169
pub struct MoonbaseCustomizations;
170
#[cfg(feature = "moonbase-native")]
171
impl ClientCustomizations for MoonbaseCustomizations {
172
900
	fn first_block_number_compatible_with_ed25519_zebra() -> Option<u32> {
173
900
		Some(3_000_000)
174
900
	}
175
}
176

            
177
/// Trivial enum representing runtime variant
178
#[derive(Clone)]
179
pub enum RuntimeVariant {
180
	#[cfg(feature = "moonbeam-native")]
181
	Moonbeam,
182
	#[cfg(feature = "moonriver-native")]
183
	Moonriver,
184
	#[cfg(feature = "moonbase-native")]
185
	Moonbase,
186
	Unrecognized,
187
}
188

            
189
impl RuntimeVariant {
190
	pub fn from_chain_spec(chain_spec: &Box<dyn ChainSpec>) -> Self {
191
		match chain_spec {
192
			#[cfg(feature = "moonbeam-native")]
193
			spec if spec.is_moonbeam() => Self::Moonbeam,
194
			#[cfg(feature = "moonriver-native")]
195
			spec if spec.is_moonriver() => Self::Moonriver,
196
			#[cfg(feature = "moonbase-native")]
197
			spec if spec.is_moonbase() => Self::Moonbase,
198
			_ => Self::Unrecognized,
199
		}
200
	}
201
}
202

            
203
/// Can be called for a `Configuration` to check if it is a configuration for
204
/// the `Moonbeam` network.
205
pub trait IdentifyVariant {
206
	/// Returns `true` if this is a configuration for the `Moonbase` network.
207
	fn is_moonbase(&self) -> bool;
208

            
209
	/// Returns `true` if this is a configuration for the `Moonbeam` network.
210
	fn is_moonbeam(&self) -> bool;
211

            
212
	/// Returns `true` if this is a configuration for the `Moonriver` network.
213
	fn is_moonriver(&self) -> bool;
214

            
215
	/// Returns `true` if this is a configuration for a dev network.
216
	fn is_dev(&self) -> bool;
217
}
218

            
219
impl IdentifyVariant for Box<dyn ChainSpec> {
220
	fn is_moonbase(&self) -> bool {
221
		self.id().starts_with("moonbase")
222
	}
223

            
224
900
	fn is_moonbeam(&self) -> bool {
225
900
		self.id().starts_with("moonbeam")
226
900
	}
227

            
228
900
	fn is_moonriver(&self) -> bool {
229
900
		self.id().starts_with("moonriver")
230
900
	}
231

            
232
898
	fn is_dev(&self) -> bool {
233
898
		self.chain_type() == sc_chain_spec::ChainType::Development
234
898
	}
235
}
236

            
237
900
pub fn frontier_database_dir(config: &Configuration, path: &str) -> std::path::PathBuf {
238
900
	config
239
900
		.base_path
240
900
		.config_dir(config.chain_spec.id())
241
900
		.join("frontier")
242
900
		.join(path)
243
900
}
244

            
245
// TODO This is copied from frontier. It should be imported instead after
246
// https://github.com/paritytech/frontier/issues/333 is solved
247
900
pub fn open_frontier_backend<C, BE>(
248
900
	client: Arc<C>,
249
900
	config: &Configuration,
250
900
	rpc_config: &RpcConfig,
251
900
) -> Result<fc_db::Backend<Block, C>, String>
252
900
where
253
900
	C: ProvideRuntimeApi<Block> + StorageProvider<Block, BE> + AuxStore,
254
900
	C: HeaderBackend<Block> + HeaderMetadata<Block, Error = BlockChainError>,
255
900
	C: Send + Sync + 'static,
256
900
	C::Api: fp_rpc::EthereumRuntimeRPCApi<Block>,
257
900
	BE: Backend<Block> + 'static,
258
900
	BE::State: StateBackend<BlakeTwo256>,
259
900
{
260
900
	let frontier_backend = match rpc_config.frontier_backend_config {
261
		FrontierBackendConfig::KeyValue => {
262
			fc_db::Backend::KeyValue(Arc::new(fc_db::kv::Backend::<Block, C>::new(
263
900
				client,
264
900
				&fc_db::kv::DatabaseSettings {
265
900
					source: match config.database {
266
900
						DatabaseSource::RocksDb { .. } => DatabaseSource::RocksDb {
267
900
							path: frontier_database_dir(config, "db"),
268
900
							cache_size: 0,
269
900
						},
270
						DatabaseSource::ParityDb { .. } => DatabaseSource::ParityDb {
271
							path: frontier_database_dir(config, "paritydb"),
272
						},
273
						DatabaseSource::Auto { .. } => DatabaseSource::Auto {
274
							rocksdb_path: frontier_database_dir(config, "db"),
275
							paritydb_path: frontier_database_dir(config, "paritydb"),
276
							cache_size: 0,
277
						},
278
						_ => {
279
							return Err(
280
								"Supported db sources: `rocksdb` | `paritydb` | `auto`".to_string()
281
							)
282
						}
283
					},
284
				},
285
			)?))
286
		}
287
		FrontierBackendConfig::Sql {
288
			pool_size,
289
			num_ops_timeout,
290
			thread_count,
291
			cache_size,
292
		} => {
293
			let overrides = Arc::new(StorageOverrideHandler::new(client.clone()));
294
			let sqlite_db_path = frontier_database_dir(config, "sql");
295
			std::fs::create_dir_all(&sqlite_db_path).expect("failed creating sql db directory");
296
			let backend = futures::executor::block_on(fc_db::sql::Backend::new(
297
				fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig {
298
					path: Path::new("sqlite:///")
299
						.join(sqlite_db_path)
300
						.join("frontier.db3")
301
						.to_str()
302
						.expect("frontier sql path error"),
303
					create_if_missing: true,
304
					thread_count: thread_count,
305
					cache_size: cache_size,
306
				}),
307
				pool_size,
308
				std::num::NonZeroU32::new(num_ops_timeout),
309
				overrides.clone(),
310
			))
311
			.unwrap_or_else(|err| panic!("failed creating sql backend: {:?}", err));
312
			fc_db::Backend::Sql(Arc::new(backend))
313
		}
314
	};
315

            
316
900
	Ok(frontier_backend)
317
900
}
318

            
319
use sp_runtime::{traits::BlakeTwo256, DigestItem, Percent};
320

            
321
pub const SOFT_DEADLINE_PERCENT: Percent = Percent::from_percent(100);
322

            
323
/// Builds a new object suitable for chain operations.
324
#[allow(clippy::type_complexity)]
325
pub fn new_chain_ops(
326
	config: &mut Configuration,
327
	rpc_config: &RpcConfig,
328
) -> Result<
329
	(
330
		Arc<Client>,
331
		Arc<FullBackend>,
332
		sc_consensus::BasicQueue<Block>,
333
		TaskManager,
334
	),
335
	ServiceError,
336
> {
337
	match &config.chain_spec {
338
		#[cfg(feature = "moonriver-native")]
339
		spec if spec.is_moonriver() => new_chain_ops_inner::<
340
			moonriver_runtime::RuntimeApi,
341
			MoonriverCustomizations,
342
		>(config, rpc_config),
343
		#[cfg(feature = "moonbeam-native")]
344
		spec if spec.is_moonbeam() => new_chain_ops_inner::<
345
			moonbeam_runtime::RuntimeApi,
346
			MoonbeamCustomizations,
347
		>(config, rpc_config),
348
		#[cfg(feature = "moonbase-native")]
349
		_ => new_chain_ops_inner::<moonbase_runtime::RuntimeApi, MoonbaseCustomizations>(
350
			config, rpc_config,
351
		),
352
		#[cfg(not(feature = "moonbase-native"))]
353
		_ => panic!("invalid chain spec"),
354
	}
355
}
356

            
357
#[allow(clippy::type_complexity)]
358
fn new_chain_ops_inner<RuntimeApi, Customizations>(
359
	config: &mut Configuration,
360
	rpc_config: &RpcConfig,
361
) -> Result<
362
	(
363
		Arc<Client>,
364
		Arc<FullBackend>,
365
		sc_consensus::BasicQueue<Block>,
366
		TaskManager,
367
	),
368
	ServiceError,
369
>
370
where
371
	Client: From<Arc<crate::FullClient<RuntimeApi>>>,
372
	RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
373
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
374
	Customizations: ClientCustomizations + 'static,
375
{
376
	config.keystore = sc_service::config::KeystoreConfig::InMemory;
377
	let PartialComponents {
378
		client,
379
		backend,
380
		import_queue,
381
		task_manager,
382
		..
383
	} = new_partial::<RuntimeApi, Customizations>(config, rpc_config, config.chain_spec.is_dev())?;
384
	Ok((
385
		Arc::new(Client::from(client)),
386
		backend,
387
		import_queue,
388
		task_manager,
389
	))
390
}
391

            
392
// If we're using prometheus, use a registry with a prefix of `moonbeam`.
393
903
fn set_prometheus_registry(
394
903
	config: &mut Configuration,
395
903
	skip_prefix: bool,
396
903
) -> Result<(), ServiceError> {
397
903
	if let Some(PrometheusConfig { registry, .. }) = config.prometheus_config.as_mut() {
398
3
		let labels = hashmap! {
399
3
			"chain".into() => config.chain_spec.id().into(),
400
3
		};
401
3
		let prefix = if skip_prefix {
402
1
			None
403
		} else {
404
2
			Some("moonbeam".into())
405
		};
406

            
407
3
		*registry = Registry::new_custom(prefix, Some(labels))?;
408
900
	}
409

            
410
903
	Ok(())
411
903
}
412

            
413
/// Builds the PartialComponents for a parachain or development service
414
///
415
/// Use this function if you don't actually need the full service, but just the partial in order to
416
/// be able to perform chain operations.
417
#[allow(clippy::type_complexity)]
418
900
pub fn new_partial<RuntimeApi, Customizations>(
419
900
	config: &mut Configuration,
420
900
	rpc_config: &RpcConfig,
421
900
	dev_service: bool,
422
900
) -> PartialComponentsResult<RuntimeApi>
423
900
where
424
900
	RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
425
900
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
426
900
	Customizations: ClientCustomizations + 'static,
427
900
{
428
900
	set_prometheus_registry(config, rpc_config.no_prometheus_prefix)?;
429

            
430
	// Use ethereum style for subscription ids
431
900
	config.rpc_id_provider = Some(Box::new(fc_rpc::EthereumSubIdProvider));
432

            
433
900
	let telemetry = config
434
900
		.telemetry_endpoints
435
900
		.clone()
436
900
		.filter(|x| !x.is_empty())
437
900
		.map(|endpoints| -> Result<_, sc_telemetry::Error> {
438
			let worker = TelemetryWorker::new(16)?;
439
			let telemetry = worker.handle().new_telemetry(endpoints);
440
			Ok((worker, telemetry))
441
900
		})
442
900
		.transpose()?;
443

            
444
900
	let heap_pages = config
445
900
		.default_heap_pages
446
900
		.map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
447
			extra_pages: h as _,
448
900
		});
449
900
	let mut wasm_builder = WasmExecutor::builder()
450
900
		.with_execution_method(config.wasm_method)
451
900
		.with_onchain_heap_alloc_strategy(heap_pages)
452
900
		.with_offchain_heap_alloc_strategy(heap_pages)
453
900
		.with_ignore_onchain_heap_pages(true)
454
900
		.with_max_runtime_instances(config.max_runtime_instances)
455
900
		.with_runtime_cache_size(config.runtime_cache_size);
456

            
457
900
	if let Some(ref wasmtime_precompiled_path) = config.wasmtime_precompiled {
458
898
		wasm_builder = wasm_builder.with_wasmtime_precompiled_path(wasmtime_precompiled_path);
459
898
	}
460

            
461
900
	let executor = wasm_builder.build();
462

            
463
900
	let (client, backend, keystore_container, task_manager) =
464
900
		sc_service::new_full_parts::<Block, RuntimeApi, _>(
465
900
			config,
466
900
			telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
467
900
			executor,
468
900
		)?;
469

            
470
900
	if let Some(block_number) = Customizations::first_block_number_compatible_with_ed25519_zebra() {
471
900
		client
472
900
			.execution_extensions()
473
900
			.set_extensions_factory(sc_client_api::execution_extensions::ExtensionBeforeBlock::<
474
900
			Block,
475
900
			sp_io::UseDalekExt,
476
900
		>::new(block_number));
477
900
	}
478

            
479
900
	let client = Arc::new(client);
480
900

            
481
900
	let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
482
900

            
483
900
	let telemetry = telemetry.map(|(worker, telemetry)| {
484
		task_manager
485
			.spawn_handle()
486
			.spawn("telemetry", None, worker.run());
487
		telemetry
488
900
	});
489

            
490
900
	let maybe_select_chain = if dev_service {
491
898
		Some(sc_consensus::LongestChain::new(backend.clone()))
492
	} else {
493
2
		None
494
	};
495

            
496
900
	let transaction_pool = sc_transaction_pool::BasicPool::new_full(
497
900
		config.transaction_pool.clone(),
498
900
		config.role.is_authority().into(),
499
900
		config.prometheus_registry(),
500
900
		task_manager.spawn_essential_handle(),
501
900
		client.clone(),
502
900
	);
503
900

            
504
900
	let filter_pool: Option<FilterPool> = Some(Arc::new(Mutex::new(BTreeMap::new())));
505
900
	let fee_history_cache: FeeHistoryCache = Arc::new(Mutex::new(BTreeMap::new()));
506

            
507
900
	let frontier_backend = Arc::new(open_frontier_backend(client.clone(), config, rpc_config)?);
508
900
	let frontier_block_import = FrontierBlockImport::new(client.clone(), client.clone());
509
900

            
510
900
	let create_inherent_data_providers = move |_, _| async move {
511
		let time = sp_timestamp::InherentDataProvider::from_system_time();
512
		Ok((time,))
513
	};
514

            
515
900
	let (import_queue, block_import) = if dev_service {
516
		(
517
898
			nimbus_consensus::import_queue(
518
898
				client.clone(),
519
898
				frontier_block_import.clone(),
520
898
				create_inherent_data_providers,
521
898
				&task_manager.spawn_essential_handle(),
522
898
				config.prometheus_registry(),
523
898
				!dev_service,
524
898
			)?,
525
898
			BlockImportPipeline::Dev(frontier_block_import),
526
		)
527
	} else {
528
2
		let parachain_block_import = ParachainBlockImport::new_with_delayed_best_block(
529
2
			frontier_block_import,
530
2
			backend.clone(),
531
2
		);
532
2
		(
533
2
			nimbus_consensus::import_queue(
534
2
				client.clone(),
535
2
				parachain_block_import.clone(),
536
2
				create_inherent_data_providers,
537
2
				&task_manager.spawn_essential_handle(),
538
2
				config.prometheus_registry(),
539
2
				!dev_service,
540
2
			)?,
541
2
			BlockImportPipeline::Parachain(parachain_block_import),
542
		)
543
	};
544

            
545
900
	Ok(PartialComponents {
546
900
		backend,
547
900
		client,
548
900
		import_queue,
549
900
		keystore_container,
550
900
		task_manager,
551
900
		transaction_pool,
552
900
		select_chain: maybe_select_chain,
553
900
		other: (
554
900
			block_import,
555
900
			filter_pool,
556
900
			telemetry,
557
900
			telemetry_worker_handle,
558
900
			frontier_backend,
559
900
			fee_history_cache,
560
900
		),
561
900
	})
562
900
}
563

            
564
async fn build_relay_chain_interface(
565
	polkadot_config: Configuration,
566
	parachain_config: &Configuration,
567
	telemetry_worker_handle: Option<TelemetryWorkerHandle>,
568
	task_manager: &mut TaskManager,
569
	collator_options: CollatorOptions,
570
	hwbench: Option<sc_sysinfo::HwBench>,
571
) -> RelayChainResult<(
572
	Arc<(dyn RelayChainInterface + 'static)>,
573
	Option<CollatorPair>,
574
)> {
575
	if let cumulus_client_cli::RelayChainMode::ExternalRpc(rpc_target_urls) =
576
		collator_options.relay_chain_mode
577
	{
578
		build_minimal_relay_chain_node_with_rpc(polkadot_config, task_manager, rpc_target_urls)
579
			.await
580
	} else {
581
		build_inprocess_relay_chain(
582
			polkadot_config,
583
			parachain_config,
584
			telemetry_worker_handle,
585
			task_manager,
586
			hwbench,
587
		)
588
	}
589
}
590

            
591
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
592
///
593
/// This is the actual implementation that is abstract over the executor and the runtime api.
594
#[sc_tracing::logging::prefix_logs_with("🌗")]
595
async fn start_node_impl<RuntimeApi, Customizations, Net>(
596
	parachain_config: Configuration,
597
	polkadot_config: Configuration,
598
	collator_options: CollatorOptions,
599
	para_id: ParaId,
600
	rpc_config: RpcConfig,
601
	async_backing: bool,
602
	block_authoring_duration: Duration,
603
	hwbench: Option<sc_sysinfo::HwBench>,
604
) -> sc_service::error::Result<(TaskManager, Arc<FullClient<RuntimeApi>>)>
605
where
606
	RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
607
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
608
	Customizations: ClientCustomizations + 'static,
609
	Net: NetworkBackend<Block, Hash>,
610
{
611
	let mut parachain_config = prepare_node_config(parachain_config);
612

            
613
	let params =
614
		new_partial::<RuntimeApi, Customizations>(&mut parachain_config, &rpc_config, false)?;
615
	let (
616
		block_import,
617
		filter_pool,
618
		mut telemetry,
619
		telemetry_worker_handle,
620
		frontier_backend,
621
		fee_history_cache,
622
	) = params.other;
623

            
624
	let client = params.client.clone();
625
	let backend = params.backend.clone();
626
	let mut task_manager = params.task_manager;
627

            
628
	let (relay_chain_interface, collator_key) = build_relay_chain_interface(
629
		polkadot_config,
630
		&parachain_config,
631
		telemetry_worker_handle,
632
		&mut task_manager,
633
		collator_options.clone(),
634
		hwbench.clone(),
635
	)
636
	.await
637
	.map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
638

            
639
	let force_authoring = parachain_config.force_authoring;
640
	let collator = parachain_config.role.is_authority();
641
	let prometheus_registry = parachain_config.prometheus_registry().cloned();
642
	let transaction_pool = params.transaction_pool.clone();
643
	let import_queue_service = params.import_queue.service();
644
	let net_config = FullNetworkConfiguration::<_, _, Net>::new(&parachain_config.network);
645

            
646
	let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) =
647
		cumulus_client_service::build_network(cumulus_client_service::BuildNetworkParams {
648
			parachain_config: &parachain_config,
649
			client: client.clone(),
650
			transaction_pool: transaction_pool.clone(),
651
			spawn_handle: task_manager.spawn_handle(),
652
			import_queue: params.import_queue,
653
			para_id,
654
			relay_chain_interface: relay_chain_interface.clone(),
655
			net_config,
656
			sybil_resistance_level: CollatorSybilResistance::Resistant,
657
		})
658
		.await?;
659

            
660
	let overrides = Arc::new(StorageOverrideHandler::new(client.clone()));
661
	let fee_history_limit = rpc_config.fee_history_limit;
662

            
663
	// Sinks for pubsub notifications.
664
	// Everytime a new subscription is created, a new mpsc channel is added to the sink pool.
665
	// The MappingSyncWorker sends through the channel on block import and the subscription emits a
666
	// notification to the subscriber on receiving a message through this channel.
667
	// This way we avoid race conditions when using native substrate block import notification
668
	// stream.
669
	let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
670
		fc_mapping_sync::EthereumBlockNotification<Block>,
671
	> = Default::default();
672
	let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
673

            
674
	rpc::spawn_essential_tasks(
675
		rpc::SpawnTasksParams {
676
			task_manager: &task_manager,
677
			client: client.clone(),
678
			substrate_backend: backend.clone(),
679
			frontier_backend: frontier_backend.clone(),
680
			filter_pool: filter_pool.clone(),
681
			overrides: overrides.clone(),
682
			fee_history_limit,
683
			fee_history_cache: fee_history_cache.clone(),
684
		},
685
		sync_service.clone(),
686
		pubsub_notification_sinks.clone(),
687
	);
688

            
689
	let ethapi_cmd = rpc_config.ethapi.clone();
690
	let tracing_requesters =
691
		if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
692
			rpc::tracing::spawn_tracing_tasks(
693
				&rpc_config,
694
				prometheus_registry.clone(),
695
				rpc::SpawnTasksParams {
696
					task_manager: &task_manager,
697
					client: client.clone(),
698
					substrate_backend: backend.clone(),
699
					frontier_backend: frontier_backend.clone(),
700
					filter_pool: filter_pool.clone(),
701
					overrides: overrides.clone(),
702
					fee_history_limit,
703
					fee_history_cache: fee_history_cache.clone(),
704
				},
705
			)
706
		} else {
707
			rpc::tracing::RpcRequesters {
708
				debug: None,
709
				trace: None,
710
			}
711
		};
712

            
713
	let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
714
		task_manager.spawn_handle(),
715
		overrides.clone(),
716
		rpc_config.eth_log_block_cache,
717
		rpc_config.eth_statuses_cache,
718
		prometheus_registry.clone(),
719
	));
720

            
721
	let rpc_builder = {
722
		let client = client.clone();
723
		let pool = transaction_pool.clone();
724
		let network = network.clone();
725
		let sync = sync_service.clone();
726
		let filter_pool = filter_pool.clone();
727
		let frontier_backend = frontier_backend.clone();
728
		let backend = backend.clone();
729
		let ethapi_cmd = ethapi_cmd.clone();
730
		let max_past_logs = rpc_config.max_past_logs;
731
		let overrides = overrides.clone();
732
		let fee_history_cache = fee_history_cache.clone();
733
		let block_data_cache = block_data_cache.clone();
734
		let pubsub_notification_sinks = pubsub_notification_sinks.clone();
735

            
736
		let keystore = params.keystore_container.keystore();
737
		move |deny_unsafe, subscription_task_executor| {
738
			#[cfg(feature = "moonbase-native")]
739
			let forced_parent_hashes = {
740
				let mut forced_parent_hashes = BTreeMap::new();
741
				// Fixes for https://github.com/paritytech/frontier/pull/570
742
				// #1648995
743
				forced_parent_hashes.insert(
744
					H256::from_str(
745
						"0xa352fee3eef9c554a31ec0612af887796a920613358abf3353727760ea14207b",
746
					)
747
					.expect("must be valid hash"),
748
					H256::from_str(
749
						"0x0d0fd88778aec08b3a83ce36387dbf130f6f304fc91e9a44c9605eaf8a80ce5d",
750
					)
751
					.expect("must be valid hash"),
752
				);
753
				Some(forced_parent_hashes)
754
			};
755
			#[cfg(not(feature = "moonbase-native"))]
756
			let forced_parent_hashes = None;
757

            
758
			let deps = rpc::FullDeps {
759
				backend: backend.clone(),
760
				client: client.clone(),
761
				command_sink: None,
762
				deny_unsafe,
763
				ethapi_cmd: ethapi_cmd.clone(),
764
				filter_pool: filter_pool.clone(),
765
				frontier_backend: match &*frontier_backend {
766
					fc_db::Backend::KeyValue(b) => b.clone(),
767
					fc_db::Backend::Sql(b) => b.clone(),
768
				},
769
				graph: pool.pool().clone(),
770
				pool: pool.clone(),
771
				is_authority: collator,
772
				max_past_logs,
773
				fee_history_limit,
774
				fee_history_cache: fee_history_cache.clone(),
775
				network: network.clone(),
776
				sync: sync.clone(),
777
				xcm_senders: None,
778
				block_data_cache: block_data_cache.clone(),
779
				overrides: overrides.clone(),
780
				forced_parent_hashes,
781
			};
782
			let pending_consensus_data_provider = Box::new(PendingConsensusDataProvider::new(
783
				client.clone(),
784
				keystore.clone(),
785
			));
786
			if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
787
				rpc::create_full(
788
					deps,
789
					subscription_task_executor,
790
					Some(crate::rpc::TracingConfig {
791
						tracing_requesters: tracing_requesters.clone(),
792
						trace_filter_max_count: rpc_config.ethapi_trace_max_count,
793
					}),
794
					pubsub_notification_sinks.clone(),
795
					pending_consensus_data_provider,
796
				)
797
				.map_err(Into::into)
798
			} else {
799
				rpc::create_full(
800
					deps,
801
					subscription_task_executor,
802
					None,
803
					pubsub_notification_sinks.clone(),
804
					pending_consensus_data_provider,
805
				)
806
				.map_err(Into::into)
807
			}
808
		}
809
	};
810

            
811
	sc_service::spawn_tasks(sc_service::SpawnTasksParams {
812
		rpc_builder: Box::new(rpc_builder),
813
		client: client.clone(),
814
		transaction_pool: transaction_pool.clone(),
815
		task_manager: &mut task_manager,
816
		config: parachain_config,
817
		keystore: params.keystore_container.keystore(),
818
		backend: backend.clone(),
819
		network: network.clone(),
820
		sync_service: sync_service.clone(),
821
		system_rpc_tx,
822
		tx_handler_controller,
823
		telemetry: telemetry.as_mut(),
824
	})?;
825

            
826
	if let Some(hwbench) = hwbench {
827
		sc_sysinfo::print_hwbench(&hwbench);
828

            
829
		if let Some(ref mut telemetry) = telemetry {
830
			let telemetry_handle = telemetry.handle();
831
			task_manager.spawn_handle().spawn(
832
				"telemetry_hwbench",
833
				None,
834
				sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
835
			);
836
		}
837
	}
838

            
839
	let announce_block = {
840
		let sync_service = sync_service.clone();
841
		Arc::new(move |hash, data| sync_service.announce_block(hash, data))
842
	};
843

            
844
	let relay_chain_slot_duration = Duration::from_secs(6);
845
	let overseer_handle = relay_chain_interface
846
		.overseer_handle()
847
		.map_err(|e| sc_service::Error::Application(Box::new(e)))?;
848

            
849
	start_relay_chain_tasks(StartRelayChainTasksParams {
850
		client: client.clone(),
851
		announce_block: announce_block.clone(),
852
		para_id,
853
		relay_chain_interface: relay_chain_interface.clone(),
854
		task_manager: &mut task_manager,
855
		da_recovery_profile: if collator {
856
			DARecoveryProfile::Collator
857
		} else {
858
			DARecoveryProfile::FullNode
859
		},
860
		import_queue: import_queue_service,
861
		relay_chain_slot_duration,
862
		recovery_handle: Box::new(overseer_handle.clone()),
863
		sync_service: sync_service.clone(),
864
	})?;
865

            
866
	let BlockImportPipeline::Parachain(block_import) = block_import else {
867
		return Err(sc_service::Error::Other(
868
			"Block import pipeline is not for parachain".into(),
869
		));
870
	};
871

            
872
	if collator {
873
		start_consensus::<RuntimeApi, _>(
874
			async_backing,
875
			backend.clone(),
876
			client.clone(),
877
			block_import,
878
			prometheus_registry.as_ref(),
879
			telemetry.as_ref().map(|t| t.handle()),
880
			&task_manager,
881
			relay_chain_interface.clone(),
882
			transaction_pool,
883
			params.keystore_container.keystore(),
884
			para_id,
885
			collator_key.expect("Command line arguments do not allow this. qed"),
886
			overseer_handle,
887
			announce_block,
888
			force_authoring,
889
			relay_chain_slot_duration,
890
			block_authoring_duration,
891
			sync_service.clone(),
892
		)?;
893
		/*let parachain_consensus = build_consensus(
894
			client.clone(),
895
			backend,
896
			block_import,
897
			prometheus_registry.as_ref(),
898
			telemetry.as_ref().map(|t| t.handle()),
899
			&task_manager,
900
			relay_chain_interface.clone(),
901
			transaction_pool,
902
			sync_service.clone(),
903
			params.keystore_container.keystore(),
904
			force_authoring,
905
		)?;
906

            
907
		let spawner = task_manager.spawn_handle();
908

            
909
		let params = StartCollatorParams {
910
			para_id,
911
			block_status: client.clone(),
912
			announce_block,
913
			client: client.clone(),
914
			task_manager: &mut task_manager,
915
			relay_chain_interface,
916
			spawner,
917
			parachain_consensus,
918
			import_queue: import_queue_service,
919
			recovery_handle: Box::new(overseer_handle),
920
			collator_key: collator_key.ok_or(sc_service::error::Error::Other(
921
				"Collator Key is None".to_string(),
922
			))?,
923
			relay_chain_slot_duration,
924
			sync_service,
925
		};
926

            
927
		#[allow(deprecated)]
928
		start_collator(params).await?;*/
929
	}
930

            
931
	start_network.start_network();
932

            
933
	Ok((task_manager, client))
934
}
935

            
936
fn start_consensus<RuntimeApi, SO>(
937
	async_backing: bool,
938
	backend: Arc<FullBackend>,
939
	client: Arc<FullClient<RuntimeApi>>,
940
	block_import: ParachainBlockImport<RuntimeApi>,
941
	prometheus_registry: Option<&Registry>,
942
	telemetry: Option<TelemetryHandle>,
943
	task_manager: &TaskManager,
944
	relay_chain_interface: Arc<dyn RelayChainInterface>,
945
	transaction_pool: Arc<sc_transaction_pool::FullPool<Block, FullClient<RuntimeApi>>>,
946
	keystore: KeystorePtr,
947
	para_id: ParaId,
948
	collator_key: CollatorPair,
949
	overseer_handle: OverseerHandle,
950
	announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
951
	force_authoring: bool,
952
	relay_chain_slot_duration: Duration,
953
	block_authoring_duration: Duration,
954
	sync_oracle: SO,
955
) -> Result<(), sc_service::Error>
956
where
957
	RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
958
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
959
	sc_client_api::StateBackendFor<TFullBackend<Block>, Block>:
960
		sc_client_api::StateBackend<BlakeTwo256>,
961
	SO: SyncOracle + Send + Sync + Clone + 'static,
962
{
963
	let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording(
964
		task_manager.spawn_handle(),
965
		client.clone(),
966
		transaction_pool,
967
		prometheus_registry,
968
		telemetry.clone(),
969
	);
970

            
971
	let proposer = Proposer::new(proposer_factory);
972

            
973
	let collator_service = CollatorService::new(
974
		client.clone(),
975
		Arc::new(task_manager.spawn_handle()),
976
		announce_block,
977
		client.clone(),
978
	);
979

            
980
	let create_inherent_data_providers = |_, _| async move {
981
		let time = sp_timestamp::InherentDataProvider::from_system_time();
982

            
983
		let author = nimbus_primitives::InherentDataProvider;
984

            
985
		let randomness = session_keys_primitives::InherentDataProvider;
986

            
987
		Ok((time, author, randomness))
988
	};
989

            
990
	let client_clone = client.clone();
991
	let keystore_clone = keystore.clone();
992
	let maybe_provide_vrf_digest =
993
		move |nimbus_id: NimbusId, parent: Hash| -> Option<sp_runtime::generic::DigestItem> {
994
			moonbeam_vrf::vrf_pre_digest::<Block, FullClient<RuntimeApi>>(
995
				&client_clone,
996
				&keystore_clone,
997
				nimbus_id,
998
				parent,
999
			)
		};
	if async_backing {
		log::info!("Collator started with asynchronous backing.");
		let client_clone = client.clone();
		let code_hash_provider = move |block_hash| {
			client_clone
				.code_at(block_hash)
				.ok()
				.map(polkadot_primitives::ValidationCode)
				.map(|c| c.hash())
		};
		task_manager.spawn_essential_handle().spawn(
			"nimbus",
			None,
			nimbus_consensus::collators::lookahead::run::<
				Block,
				_,
				_,
				_,
				FullBackend,
				_,
				_,
				_,
				_,
				_,
				_,
			>(nimbus_consensus::collators::lookahead::Params {
				additional_digests_provider: maybe_provide_vrf_digest,
				additional_relay_keys: vec![
					moonbeam_core_primitives::well_known_relay_keys::TIMESTAMP_NOW.to_vec(),
				],
				authoring_duration: block_authoring_duration,
				block_import,
				code_hash_provider,
				collator_key,
				collator_service,
				create_inherent_data_providers,
				force_authoring,
				keystore,
				overseer_handle,
				para_backend: backend,
				para_client: client,
				para_id,
				proposer,
				relay_chain_slot_duration,
				relay_client: relay_chain_interface,
				slot_duration: None,
				sync_oracle,
				reinitialize: false,
			}),
		);
	} else {
		log::info!("Collator started without asynchronous backing.");
		task_manager.spawn_essential_handle().spawn(
			"nimbus",
			None,
			nimbus_consensus::collators::basic::run::<Block, _, _, FullBackend, _, _, _, _, _>(
				nimbus_consensus::collators::basic::Params {
					additional_digests_provider: maybe_provide_vrf_digest,
					additional_relay_keys: vec![
						moonbeam_core_primitives::well_known_relay_keys::TIMESTAMP_NOW.to_vec(),
					],
					//authoring_duration: Duration::from_millis(500),
					block_import,
					collator_key,
					collator_service,
					create_inherent_data_providers,
					force_authoring,
					keystore,
					overseer_handle,
					para_id,
					para_client: client,
					proposer,
					relay_client: relay_chain_interface,
				},
			),
		);
	};
	Ok(())
}
/// Start a normal parachain node.
// Rustfmt wants to format the closure with space identation.
#[rustfmt::skip]
pub async fn start_node<RuntimeApi, Customizations>(
	parachain_config: Configuration,
	polkadot_config: Configuration,
	collator_options: CollatorOptions,
	para_id: ParaId,
	rpc_config: RpcConfig,
	async_backing: bool,
	block_authoring_duration: Duration,
	hwbench: Option<sc_sysinfo::HwBench>,
) -> sc_service::error::Result<(TaskManager, Arc<FullClient<RuntimeApi>>)>
where
	RuntimeApi:
		ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
	RuntimeApi::RuntimeApi:
		RuntimeApiCollection,
	Customizations: ClientCustomizations + 'static,
{
	start_node_impl::<RuntimeApi, Customizations, sc_network::NetworkWorker<_, _>>(
		parachain_config,
		polkadot_config,
		collator_options,
		para_id,
		rpc_config,
		async_backing,
		block_authoring_duration,
		hwbench,
	)
	.await
}
/// Builds a new development service. This service uses manual seal, and mocks
/// the parachain inherent.
898
pub async fn new_dev<RuntimeApi, Customizations, Net>(
898
	mut config: Configuration,
898
	_author_id: Option<NimbusId>,
898
	sealing: moonbeam_cli_opt::Sealing,
898
	rpc_config: RpcConfig,
898
	hwbench: Option<sc_sysinfo::HwBench>,
898
) -> Result<TaskManager, ServiceError>
898
where
898
	RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
898
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
898
	Customizations: ClientCustomizations + 'static,
898
	Net: NetworkBackend<Block, Hash>,
898
{
	use async_io::Timer;
	use futures::Stream;
	use sc_consensus_manual_seal::{run_manual_seal, EngineCommand, ManualSealParams};
	let sc_service::PartialComponents {
898
		client,
898
		backend,
898
		mut task_manager,
898
		import_queue,
898
		keystore_container,
898
		select_chain: maybe_select_chain,
898
		transaction_pool,
898
		other:
898
			(
898
				block_import_pipeline,
898
				filter_pool,
898
				mut telemetry,
898
				_telemetry_worker_handle,
898
				frontier_backend,
898
				fee_history_cache,
			),
898
	} = new_partial::<RuntimeApi, Customizations>(&mut config, &rpc_config, true)?;
898
	let block_import = if let BlockImportPipeline::Dev(block_import) = block_import_pipeline {
898
		block_import
	} else {
		return Err(ServiceError::Other(
			"Block import pipeline is not dev".to_string(),
		));
	};
898
	let net_config = FullNetworkConfiguration::<_, _, Net>::new(&config.network);
898

            
898
	let metrics = Net::register_notification_metrics(
898
		config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
898
	);
898
	let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
898
		sc_service::build_network(sc_service::BuildNetworkParams {
898
			config: &config,
898
			client: client.clone(),
898
			transaction_pool: transaction_pool.clone(),
898
			spawn_handle: task_manager.spawn_handle(),
898
			import_queue,
898
			block_announce_validator_builder: None,
898
			warp_sync_params: None,
898
			net_config,
898
			block_relay: None,
898
			metrics,
898
		})?;
898
	if config.offchain_worker.enabled {
898
		task_manager.spawn_handle().spawn(
898
			"offchain-workers-runner",
898
			"offchain-work",
898
			sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
898
				runtime_api_provider: client.clone(),
898
				keystore: Some(keystore_container.keystore()),
898
				offchain_db: backend.offchain_storage(),
898
				transaction_pool: Some(OffchainTransactionPoolFactory::new(
898
					transaction_pool.clone(),
898
				)),
898
				network_provider: Arc::new(network.clone()),
898
				is_validator: config.role.is_authority(),
898
				enable_http_requests: true,
16808
				custom_extensions: move |_| vec![],
898
			})
898
			.run(client.clone(), task_manager.spawn_handle())
898
			.boxed(),
898
		);
898
	}
898
	let prometheus_registry = config.prometheus_registry().cloned();
898
	let overrides = Arc::new(StorageOverrideHandler::new(client.clone()));
898
	let fee_history_limit = rpc_config.fee_history_limit;
898
	let mut command_sink = None;
898
	let mut xcm_senders = None;
898
	let collator = config.role.is_authority();
898

            
898
	if collator {
898
		let mut env = sc_basic_authorship::ProposerFactory::with_proof_recording(
898
			task_manager.spawn_handle(),
898
			client.clone(),
898
			transaction_pool.clone(),
898
			prometheus_registry.as_ref(),
898
			telemetry.as_ref().map(|x| x.handle()),
898
		);
898
		env.set_soft_deadline(SOFT_DEADLINE_PERCENT);
		// TODO: Need to cherry-pick
		//
		// https://github.com/moonbeam-foundation/substrate/commit/
		// d59476b362e38071d44d32c98c32fb35fd280930#diff-a1c022c97c7f9200cab161864c
		// 06d204f0c8b689955e42177731e232115e9a6f
		//
		// env.enable_ensure_proof_size_limit_after_each_extrinsic();
898
		let commands_stream: Box<dyn Stream<Item = EngineCommand<H256>> + Send + Sync + Unpin> =
898
			match sealing {
				moonbeam_cli_opt::Sealing::Instant => {
					Box::new(
						// This bit cribbed from the implementation of instant seal.
						transaction_pool
							.pool()
							.validated_pool()
							.import_notification_stream()
							.map(|_| EngineCommand::SealNewBlock {
								create_empty: false,
								finalize: false,
								parent_hash: None,
								sender: None,
							}),
					)
				}
				moonbeam_cli_opt::Sealing::Manual => {
898
					let (sink, stream) = futures::channel::mpsc::channel(1000);
898
					// Keep a reference to the other end of the channel. It goes to the RPC.
898
					command_sink = Some(sink);
898
					Box::new(stream)
				}
				moonbeam_cli_opt::Sealing::Interval(millis) => Box::new(StreamExt::map(
					Timer::interval(Duration::from_millis(millis)),
					|_| EngineCommand::SealNewBlock {
						create_empty: true,
						finalize: false,
						parent_hash: None,
						sender: None,
					},
				)),
			};
898
		let select_chain = maybe_select_chain.expect(
898
			"`new_partial` builds a `LongestChainRule` when building dev service.\
898
				We specified the dev service when calling `new_partial`.\
898
				Therefore, a `LongestChainRule` is present. qed.",
898
		);
898

            
898
		let client_set_aside_for_cidp = client.clone();
898

            
898
		// Create channels for mocked XCM messages.
898
		let (downward_xcm_sender, downward_xcm_receiver) = flume::bounded::<Vec<u8>>(100);
898
		let (hrmp_xcm_sender, hrmp_xcm_receiver) = flume::bounded::<(ParaId, Vec<u8>)>(100);
898
		xcm_senders = Some((downward_xcm_sender, hrmp_xcm_sender));
898

            
898
		let client_clone = client.clone();
898
		let keystore_clone = keystore_container.keystore().clone();
898
		let maybe_provide_vrf_digest =
16826
			move |nimbus_id: NimbusId, parent: Hash| -> Option<sp_runtime::generic::DigestItem> {
16826
				moonbeam_vrf::vrf_pre_digest::<Block, FullClient<RuntimeApi>>(
16826
					&client_clone,
16826
					&keystore_clone,
16826
					nimbus_id,
16826
					parent,
16826
				)
16826
			};
898
		task_manager.spawn_essential_handle().spawn_blocking(
898
			"authorship_task",
898
			Some("block-authoring"),
898
			run_manual_seal(ManualSealParams {
898
				block_import,
898
				env,
898
				client: client.clone(),
898
				pool: transaction_pool.clone(),
898
				commands_stream,
898
				select_chain,
898
				consensus_data_provider: Some(Box::new(NimbusManualSealConsensusDataProvider {
898
					keystore: keystore_container.keystore(),
898
					client: client.clone(),
898
					additional_digests_provider: maybe_provide_vrf_digest,
898
					_phantom: Default::default(),
898
				})),
16826
				create_inherent_data_providers: move |block: H256, ()| {
16826
					let maybe_current_para_block = client_set_aside_for_cidp.number(block);
16826
					let maybe_current_para_head = client_set_aside_for_cidp.expect_header(block);
16826
					let downward_xcm_receiver = downward_xcm_receiver.clone();
16826
					let hrmp_xcm_receiver = hrmp_xcm_receiver.clone();
16826

            
16826
					let client_for_xcm = client_set_aside_for_cidp.clone();
16826
					async move {
16826
						let time = sp_timestamp::InherentDataProvider::from_system_time();
16826
						let current_para_block = maybe_current_para_block?
16826
							.ok_or(sp_blockchain::Error::UnknownBlock(block.to_string()))?;
16826
						let current_para_block_head = Some(polkadot_primitives::HeadData(
16826
							maybe_current_para_head?.encode(),
16826
						));
16826

            
16826
						let additional_key_values = Some(vec![(
16826
							moonbeam_core_primitives::well_known_relay_keys::TIMESTAMP_NOW.to_vec(),
16826
							sp_timestamp::Timestamp::current().encode(),
16826
						)]);
16826

            
16826
						let mocked_parachain = MockValidationDataInherentDataProvider {
16826
							current_para_block,
16826
							current_para_block_head,
16826
							relay_offset: 1000,
16826
							relay_blocks_per_para_block: 2,
16826
							// TODO: Recheck
16826
							para_blocks_per_relay_epoch: 10,
16826
							relay_randomness_config: (),
16826
							xcm_config: MockXcmConfig::new(
16826
								&*client_for_xcm,
16826
								block,
16826
								Default::default(),
16826
								Default::default(),
16826
							),
16826
							raw_downward_messages: downward_xcm_receiver.drain().collect(),
16826
							raw_horizontal_messages: hrmp_xcm_receiver.drain().collect(),
16826
							additional_key_values,
16826
						};
16826

            
16826
						let randomness = session_keys_primitives::InherentDataProvider;
16826

            
16826
						Ok((time, mocked_parachain, randomness))
16826
					}
16826
				},
898
			}),
898
		);
	}
	// Sinks for pubsub notifications.
	// Everytime a new subscription is created, a new mpsc channel is added to the sink pool.
	// The MappingSyncWorker sends through the channel on block import and the subscription emits a
	// notification to the subscriber on receiving a message through this channel.
	// This way we avoid race conditions when using native substrate block import notification
	// stream.
898
	let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
898
		fc_mapping_sync::EthereumBlockNotification<Block>,
898
	> = Default::default();
898
	let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
898

            
898
	rpc::spawn_essential_tasks(
898
		rpc::SpawnTasksParams {
898
			task_manager: &task_manager,
898
			client: client.clone(),
898
			substrate_backend: backend.clone(),
898
			frontier_backend: frontier_backend.clone(),
898
			filter_pool: filter_pool.clone(),
898
			overrides: overrides.clone(),
898
			fee_history_limit,
898
			fee_history_cache: fee_history_cache.clone(),
898
		},
898
		sync_service.clone(),
898
		pubsub_notification_sinks.clone(),
898
	);
898
	let ethapi_cmd = rpc_config.ethapi.clone();
898
	let tracing_requesters =
898
		if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
			rpc::tracing::spawn_tracing_tasks(
				&rpc_config,
				prometheus_registry.clone(),
				rpc::SpawnTasksParams {
					task_manager: &task_manager,
					client: client.clone(),
					substrate_backend: backend.clone(),
					frontier_backend: frontier_backend.clone(),
					filter_pool: filter_pool.clone(),
					overrides: overrides.clone(),
					fee_history_limit,
					fee_history_cache: fee_history_cache.clone(),
				},
			)
		} else {
898
			rpc::tracing::RpcRequesters {
898
				debug: None,
898
				trace: None,
898
			}
		};
898
	let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
898
		task_manager.spawn_handle(),
898
		overrides.clone(),
898
		rpc_config.eth_log_block_cache,
898
		rpc_config.eth_statuses_cache,
898
		prometheus_registry,
898
	));
898

            
898
	let rpc_builder = {
898
		let client = client.clone();
898
		let pool = transaction_pool.clone();
898
		let backend = backend.clone();
898
		let network = network.clone();
898
		let sync = sync_service.clone();
898
		let ethapi_cmd = ethapi_cmd.clone();
898
		let max_past_logs = rpc_config.max_past_logs;
898
		let overrides = overrides.clone();
898
		let fee_history_cache = fee_history_cache.clone();
898
		let block_data_cache = block_data_cache.clone();
898
		let pubsub_notification_sinks = pubsub_notification_sinks.clone();
898

            
898
		let keystore = keystore_container.keystore();
1796
		move |deny_unsafe, subscription_task_executor| {
1796
			let deps = rpc::FullDeps {
1796
				backend: backend.clone(),
1796
				client: client.clone(),
1796
				command_sink: command_sink.clone(),
1796
				deny_unsafe,
1796
				ethapi_cmd: ethapi_cmd.clone(),
1796
				filter_pool: filter_pool.clone(),
1796
				frontier_backend: match &*frontier_backend {
1796
					fc_db::Backend::KeyValue(b) => b.clone(),
					fc_db::Backend::Sql(b) => b.clone(),
				},
1796
				graph: pool.pool().clone(),
1796
				pool: pool.clone(),
1796
				is_authority: collator,
1796
				max_past_logs,
1796
				fee_history_limit,
1796
				fee_history_cache: fee_history_cache.clone(),
1796
				network: network.clone(),
1796
				sync: sync.clone(),
1796
				xcm_senders: xcm_senders.clone(),
1796
				overrides: overrides.clone(),
1796
				block_data_cache: block_data_cache.clone(),
1796
				forced_parent_hashes: None,
1796
			};
1796

            
1796
			let pending_consensus_data_provider = Box::new(PendingConsensusDataProvider::new(
1796
				client.clone(),
1796
				keystore.clone(),
1796
			));
1796
			if ethapi_cmd.contains(&EthApiCmd::Debug) || ethapi_cmd.contains(&EthApiCmd::Trace) {
				rpc::create_full(
					deps,
					subscription_task_executor,
					Some(crate::rpc::TracingConfig {
						tracing_requesters: tracing_requesters.clone(),
						trace_filter_max_count: rpc_config.ethapi_trace_max_count,
					}),
					pubsub_notification_sinks.clone(),
					pending_consensus_data_provider,
				)
				.map_err(Into::into)
			} else {
1796
				rpc::create_full(
1796
					deps,
1796
					subscription_task_executor,
1796
					None,
1796
					pubsub_notification_sinks.clone(),
1796
					pending_consensus_data_provider,
1796
				)
1796
				.map_err(Into::into)
			}
1796
		}
	};
898
	let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
898
		network,
898
		client,
898
		keystore: keystore_container.keystore(),
898
		task_manager: &mut task_manager,
898
		transaction_pool,
898
		rpc_builder: Box::new(rpc_builder),
898
		backend,
898
		system_rpc_tx,
898
		sync_service: sync_service.clone(),
898
		config,
898
		tx_handler_controller,
898
		telemetry: None,
898
	})?;
898
	if let Some(hwbench) = hwbench {
		sc_sysinfo::print_hwbench(&hwbench);
		if let Some(ref mut telemetry) = telemetry {
			let telemetry_handle = telemetry.handle();
			task_manager.spawn_handle().spawn(
				"telemetry_hwbench",
				None,
				sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench),
			);
		}
898
	}
898
	log::info!("Development Service Ready");
898
	network_starter.start_network();
898
	Ok(task_manager)
898
}
#[cfg(test)]
mod tests {
	use jsonrpsee::server::BatchRequestConfig;
	use moonbase_runtime::{currency::UNIT, AccountId};
	use prometheus::{proto::LabelPair, Counter};
	use sc_network::config::NetworkConfiguration;
	use sc_service::ChainType;
	use sc_service::{
		config::{BasePath, DatabaseSource, KeystoreConfig},
		Configuration, Role,
	};
	use std::path::Path;
	use std::str::FromStr;
	use crate::chain_spec::moonbase::{testnet_genesis, ChainSpec};
	use crate::chain_spec::Extensions;
	use super::*;
	#[test]
1
	fn test_set_prometheus_registry_uses_moonbeam_prefix() {
1
		let counter_name = "my_counter";
1
		let expected_metric_name = "moonbeam_my_counter";
1
		let counter = Box::new(Counter::new(counter_name, "foobar").unwrap());
1
		let mut config = Configuration {
1
			prometheus_config: Some(PrometheusConfig::new_with_default_registry(
1
				"0.0.0.0:8080".parse().unwrap(),
1
				"".into(),
1
			)),
1
			..test_config("test")
1
		};
1

            
1
		set_prometheus_registry(&mut config, false).unwrap();
1
		// generate metric
1
		let reg = config.prometheus_registry().unwrap();
1
		reg.register(counter.clone()).unwrap();
1
		counter.inc();
1

            
1
		let actual_metric_name = reg.gather().first().unwrap().get_name().to_string();
1
		assert_eq!(actual_metric_name.as_str(), expected_metric_name);
1
	}
	#[test]
1
	fn test_set_prometheus_registry_skips_moonbeam_prefix() {
1
		let counter_name = "my_counter";
1
		let counter = Box::new(Counter::new(counter_name, "foobar").unwrap());
1
		let mut config = Configuration {
1
			prometheus_config: Some(PrometheusConfig::new_with_default_registry(
1
				"0.0.0.0:8080".parse().unwrap(),
1
				"".into(),
1
			)),
1
			..test_config("test")
1
		};
1

            
1
		set_prometheus_registry(&mut config, true).unwrap();
1
		// generate metric
1
		let reg = config.prometheus_registry().unwrap();
1
		reg.register(counter.clone()).unwrap();
1
		counter.inc();
1

            
1
		let actual_metric_name = reg.gather().first().unwrap().get_name().to_string();
1
		assert_eq!(actual_metric_name.as_str(), counter_name);
1
	}
	#[test]
1
	fn test_set_prometheus_registry_adds_chain_id_as_label() {
1
		let input_chain_id = "moonriver";
1

            
1
		let mut expected_label = LabelPair::default();
1
		expected_label.set_name("chain".to_owned());
1
		expected_label.set_value("moonriver".to_owned());
1
		let expected_chain_label = Some(expected_label);
1

            
1
		let counter = Box::new(Counter::new("foo", "foobar").unwrap());
1
		let mut config = Configuration {
1
			prometheus_config: Some(PrometheusConfig::new_with_default_registry(
1
				"0.0.0.0:8080".parse().unwrap(),
1
				"".into(),
1
			)),
1
			..test_config(input_chain_id)
1
		};
1

            
1
		set_prometheus_registry(&mut config, false).unwrap();
1
		// generate metric
1
		let reg = config.prometheus_registry().unwrap();
1
		reg.register(counter.clone()).unwrap();
1
		counter.inc();
1

            
1
		let actual_chain_label = reg
1
			.gather()
1
			.first()
1
			.unwrap()
1
			.get_metric()
1
			.first()
1
			.unwrap()
1
			.get_label()
1
			.into_iter()
1
			.find(|x| x.get_name() == "chain")
1
			.cloned();
1

            
1
		assert_eq!(actual_chain_label, expected_chain_label);
1
	}
	#[test]
1
	fn dalek_does_not_panic() {
1
		use futures::executor::block_on;
1
		use sc_block_builder::BlockBuilderBuilder;
1
		use sc_client_db::{Backend, BlocksPruning, DatabaseSettings, DatabaseSource, PruningMode};
1
		use sp_api::ProvideRuntimeApi;
1
		use sp_consensus::BlockOrigin;
1
		use substrate_test_runtime::TestAPI;
1
		use substrate_test_runtime_client::runtime::Block;
1
		use substrate_test_runtime_client::{
1
			ClientBlockImportExt, TestClientBuilder, TestClientBuilderExt,
1
		};
1

            
1
		fn zero_ed_pub() -> sp_core::ed25519::Public {
1
			sp_core::ed25519::Public::default()
1
		}
1

            
1
		// This is an invalid signature
1
		// this breaks after ed25519 1.3. It makes the signature panic at creation
1
		// This test ensures we should never panic
1
		fn invalid_sig() -> sp_core::ed25519::Signature {
1
			let signature = hex_literal::hex!(
1
				"a25b94f9c64270fdfffa673f11cfe961633e3e4972e6940a3cf
1
		7351dd90b71447041a83583a52cee1cf21b36ba7fd1d0211dca58b48d997fc78d9bc82ab7a38e"
1
			);
1
			sp_core::ed25519::Signature::from_raw(signature[0..64].try_into().unwrap())
1
		}
1

            
1
		let tmp = tempfile::tempdir().unwrap();
1
		let backend = Arc::new(
1
			Backend::new(
1
				DatabaseSettings {
1
					trie_cache_maximum_size: Some(1 << 20),
1
					state_pruning: Some(PruningMode::ArchiveAll),
1
					blocks_pruning: BlocksPruning::KeepAll,
1
					source: DatabaseSource::RocksDb {
1
						path: tmp.path().into(),
1
						cache_size: 1024,
1
					},
1
				},
1
				u64::MAX,
1
			)
1
			.unwrap(),
1
		);
1
		let mut client = TestClientBuilder::with_backend(backend).build();
1

            
1
		client
1
			.execution_extensions()
1
			.set_extensions_factory(sc_client_api::execution_extensions::ExtensionBeforeBlock::<
1
			Block,
1
			sp_io::UseDalekExt,
1
		>::new(1));
1

            
1
		let a1 = BlockBuilderBuilder::new(&client)
1
			.on_parent_block(client.chain_info().genesis_hash)
1
			.with_parent_block_number(0)
1
			// Enable proof recording if required. This call is optional.
1
			.enable_proof_recording()
1
			.build()
1
			.unwrap()
1
			.build()
1
			.unwrap()
1
			.block;
1

            
1
		block_on(client.import(BlockOrigin::NetworkInitialSync, a1.clone())).unwrap();
1

            
1
		// On block zero it will use dalek
1
		// shouldnt panic on importing invalid sig
1
		assert!(!client
1
			.runtime_api()
1
			.verify_ed25519(
1
				client.chain_info().genesis_hash,
1
				invalid_sig(),
1
				zero_ed_pub(),
1
				vec![]
1
			)
1
			.unwrap());
1
	}
3
	fn test_config(chain_id: &str) -> Configuration {
3
		let network_config = NetworkConfiguration::new("", "", Default::default(), None);
3
		let runtime = tokio::runtime::Runtime::new().expect("failed creating tokio runtime");
3
		let spec = ChainSpec::builder(&[0u8], Extensions::default())
3
			.with_name("test")
3
			.with_id(chain_id)
3
			.with_chain_type(ChainType::Local)
3
			.with_genesis_config(testnet_genesis(
3
				AccountId::from_str("6Be02d1d3665660d22FF9624b7BE0551ee1Ac91b").unwrap(),
3
				vec![],
3
				vec![],
3
				vec![],
3
				vec![],
3
				vec![],
3
				1000 * UNIT,
3
				ParaId::new(0),
3
				0,
3
			))
3
			.build();
3

            
3
		Configuration {
3
			impl_name: String::from("test-impl"),
3
			impl_version: String::from("0.1"),
3
			role: Role::Full,
3
			tokio_handle: runtime.handle().clone(),
3
			transaction_pool: Default::default(),
3
			network: network_config,
3
			keystore: KeystoreConfig::Path {
3
				path: "key".into(),
3
				password: None,
3
			},
3
			database: DatabaseSource::RocksDb {
3
				path: "db".into(),
3
				cache_size: 128,
3
			},
3
			trie_cache_maximum_size: Some(16777216),
3
			state_pruning: Default::default(),
3
			blocks_pruning: sc_service::BlocksPruning::KeepAll,
3
			chain_spec: Box::new(spec),
3
			wasm_method: Default::default(),
3
			wasm_runtime_overrides: Default::default(),
3
			rpc_id_provider: None,
3
			rpc_max_connections: Default::default(),
3
			rpc_cors: None,
3
			rpc_methods: Default::default(),
3
			rpc_max_request_size: Default::default(),
3
			rpc_max_response_size: Default::default(),
3
			rpc_max_subs_per_conn: Default::default(),
3
			rpc_addr: None,
3
			rpc_port: Default::default(),
3
			rpc_message_buffer_capacity: Default::default(),
3
			data_path: Default::default(),
3
			prometheus_config: None,
3
			telemetry_endpoints: None,
3
			default_heap_pages: None,
3
			offchain_worker: Default::default(),
3
			force_authoring: false,
3
			disable_grandpa: false,
3
			dev_key_seed: None,
3
			tracing_targets: None,
3
			tracing_receiver: Default::default(),
3
			max_runtime_instances: 8,
3
			announce_block: true,
3
			base_path: BasePath::new(Path::new("")),
3
			informant_output_format: Default::default(),
3
			wasmtime_precompiled: None,
3
			runtime_cache_size: 2,
3
			rpc_rate_limit: Default::default(),
3
			rpc_batch_config: BatchRequestConfig::Unlimited,
3
		}
3
	}
}
struct PendingConsensusDataProvider<RuntimeApi>
where
	RuntimeApi: Send + Sync,
{
	client: Arc<FullClient<RuntimeApi>>,
	keystore: Arc<dyn Keystore>,
}
impl<RuntimeApi> PendingConsensusDataProvider<RuntimeApi>
where
	RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
{
1796
	pub fn new(client: Arc<FullClient<RuntimeApi>>, keystore: Arc<dyn Keystore>) -> Self {
1796
		Self { client, keystore }
1796
	}
}
impl<RuntimeApi> fc_rpc::pending::ConsensusDataProvider<Block>
	for PendingConsensusDataProvider<RuntimeApi>
where
	RuntimeApi: ConstructRuntimeApi<Block, FullClient<RuntimeApi>> + Send + Sync + 'static,
	RuntimeApi::RuntimeApi: RuntimeApiCollection,
{
2
	fn create_digest(
2
		&self,
2
		parent: &Header,
2
		_data: &sp_inherents::InherentData,
2
	) -> Result<sp_runtime::Digest, sp_inherents::Error> {
2
		let hash = parent.hash();
		// Get the digest from the best block header.
2
		let mut digest = self
2
			.client
2
			.header(hash)
2
			.map_err(|e| sp_inherents::Error::Application(Box::new(e)))?
2
			.expect("Best block header should be present")
2
			.digest;
2
		// Get the nimbus id from the digest.
2
		let nimbus_id = digest
2
			.logs
2
			.iter()
2
			.find_map(|x| {
2
				if let DigestItem::PreRuntime(nimbus_primitives::NIMBUS_ENGINE_ID, nimbus_id) = x {
2
					Some(
2
						NimbusId::from_slice(nimbus_id.as_slice())
2
							.expect("Nimbus pre-runtime digest should be valid"),
2
					)
				} else {
					None
				}
2
			})
2
			.expect("Nimbus pre-runtime digest should be present");
2
		// Remove the old VRF digest.
4
		let pos = digest.logs.iter().position(|x| {
2
			matches!(
4
				x,
				DigestItem::PreRuntime(session_keys_primitives::VRF_ENGINE_ID, _)
			)
4
		});
2
		if let Some(pos) = pos {
2
			digest.logs.remove(pos);
2
		}
		// Create the VRF digest.
2
		let vrf_digest = VrfDigestsProvider::new(self.client.clone(), self.keystore.clone())
2
			.provide_digests(nimbus_id, hash);
2
		// Append the VRF digest to the digest.
2
		digest.logs.extend(vrf_digest);
2
		Ok(digest)
2
	}
}