diff --git a/collector/src/bin/collector.rs b/collector/src/bin/collector.rs index bfbf2631e..7865e7e0b 100644 --- a/collector/src/bin/collector.rs +++ b/collector/src/bin/collector.rs @@ -21,6 +21,7 @@ use chrono::Utc; use clap::builder::TypedValueParser; use clap::{Arg, Parser}; use collector::compare::compare_artifacts; +use hashbrown::HashSet; use humansize::{format_size, BINARY}; use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator}; use tabled::builder::Builder; @@ -33,6 +34,7 @@ use collector::api::next_artifact::NextArtifact; use collector::artifact_stats::{ compile_and_get_stats, ArtifactStats, ArtifactWithStats, CargoProfile, }; +use collector::benchmark_set::{expand_benchmark_set, BenchmarkSetId, BenchmarkSetMember}; use collector::codegen::{codegen_diff, CodegenType}; use collector::compile::benchmark::category::Category; use collector::compile::benchmark::codegen_backend::CodegenBackend; @@ -52,13 +54,18 @@ use collector::runtime::{ }; use collector::runtime::{profile_runtime, RuntimeCompilationOpts}; use collector::toolchain::{ - create_toolchain_from_published_version, get_local_toolchain, Sysroot, Toolchain, - ToolchainConfig, + create_toolchain_from_published_version, get_local_toolchain, Sysroot, SysrootDownloadError, + Toolchain, ToolchainConfig, }; use collector::utils::cachegrind::cachegrind_diff; use collector::utils::{is_installed, wait_for_future}; use collector::{utils, CollectorCtx, CollectorStepBuilder}; -use database::{ArtifactId, ArtifactIdNumber, Commit, CommitType, Connection, Pool}; +use database::{ + ArtifactId, ArtifactIdNumber, BenchmarkJob, BenchmarkJobConclusion, CollectorConfig, Commit, + CommitType, Connection, Pool, +}; + +const TOOLCHAIN_CACHE_DIRECTORY: &str = "cache"; fn n_normal_benchmarks_remaining(n: usize) -> String { let suffix = if n == 1 { "" } else { "s" }; @@ -124,6 +131,7 @@ impl RuntimeBenchmarkConfig { struct SharedBenchmarkConfig { artifact_id: ArtifactId, toolchain: Toolchain, + record_duration: bool, } fn check_measureme_installed() -> Result<(), String> { @@ -692,15 +700,14 @@ enum Commands { benchmark_set: u32, }, - /// Polls the job queue for work to benchmark - DequeueJob { - /// The unique identifier for the collector + /// Benchmark test cases pulled from the job queue. + BenchmarkJobQueue { + /// The unique identifier for the collector. + /// It has to exist in the database; you can create new collectors using the `add_collector` + /// command. #[arg(long)] collector_name: String, - #[arg(long)] - target: String, - #[command(flatten)] db: DbOption, }, @@ -830,13 +837,14 @@ fn main_result() -> anyhow::Result { let shared = SharedBenchmarkConfig { artifact_id, toolchain, + record_duration: true, }; let config = RuntimeBenchmarkConfig::new( runtime_suite, RuntimeBenchmarkFilter::new(local.exclude, local.include), iterations, ); - rt.block_on(run_benchmarks(conn, shared, None, Some(config)))?; + rt.block_on(run_benchmarks(conn.as_mut(), shared, None, Some(config)))?; Ok(0) } Commands::ProfileRuntime { @@ -972,6 +980,7 @@ fn main_result() -> anyhow::Result { let shared = SharedBenchmarkConfig { toolchain, artifact_id, + record_duration: true, }; let config = CompileBenchmarkConfig { benchmarks, @@ -984,7 +993,7 @@ fn main_result() -> anyhow::Result { targets: vec![Target::default()], }; - rt.block_on(run_benchmarks(conn, shared, Some(config), None))?; + rt.block_on(run_benchmarks(conn.as_mut(), shared, Some(config), None))?; Ok(0) } @@ -1063,7 +1072,13 @@ fn main_result() -> anyhow::Result { }; let sha = commit.sha.to_string(); let sysroot = rt - .block_on(Sysroot::install(sha.clone(), &host_target_tuple, &backends)) + .block_on(Sysroot::install( + Path::new(TOOLCHAIN_CACHE_DIRECTORY), + sha.clone(), + &host_target_tuple, + &backends, + )) + .map_err(SysrootDownloadError::as_anyhow_error) .with_context(|| format!("failed to install sysroot for {commit:?}"))?; let mut benchmarks = get_compile_benchmarks( @@ -1112,10 +1127,11 @@ fn main_result() -> anyhow::Result { let shared = SharedBenchmarkConfig { artifact_id, toolchain, + record_duration: true, }; rt.block_on(run_benchmarks( - conn, + conn.as_mut(), shared, Some(compile_config), Some(runtime_config), @@ -1254,11 +1270,14 @@ fn main_result() -> anyhow::Result { let commit = get_commit_or_fake_it(last_sha).expect("success"); let rt = build_async_runtime(); - let mut sysroot = rt.block_on(Sysroot::install( - commit.sha, - &host_target_tuple, - &codegen_backends.0, - ))?; + let mut sysroot = rt + .block_on(Sysroot::install( + Path::new(TOOLCHAIN_CACHE_DIRECTORY), + commit.sha, + &host_target_tuple, + &codegen_backends.0, + )) + .map_err(SysrootDownloadError::as_anyhow_error)?; sysroot.preserve(); // don't delete it // Print the directory containing the toolchain. @@ -1333,45 +1352,281 @@ Make sure to modify `{dir}/perf-config.json` if the category/artifact don't matc Ok(0) } - Commands::DequeueJob { - collector_name, - db, - target, - } => { + Commands::BenchmarkJobQueue { collector_name, db } => { + log_db(&db); + let pool = Pool::open(&db.db); let rt = build_async_runtime(); let conn = rt.block_on(pool.connection()); // Obtain the configuration and validate that it matches the - // collector's setup - let collector_config: database::CollectorConfig = rt + // collector's host target + let collector_config = rt .block_on(conn.get_collector_config(&collector_name))? - .unwrap(); - - let collector_target = collector_config.target(); - if collector_target.as_str() != target { - panic!( - "Mismatching target for collector expected `{collector_target}` got `{target}`" - ); + .ok_or_else(|| { + anyhow::anyhow!("Collector with name `{collector_name}` not found") + })?; + + if collector_config.target().as_str() != host_target_tuple { + return Err(anyhow::anyhow!( + "The collector `{collector_name}` is configured for target `{}`, but the current host target seems to be `{host_target_tuple}`", + collector_config.target() + )); } - // Dequeue a job - let benchmark_job = rt.block_on(conn.dequeue_benchmark_job( - &collector_name, - collector_config.target(), - collector_config.benchmark_set(), + let benchmarks = + get_compile_benchmarks(&compile_benchmark_dir, CompileBenchmarkFilter::All)?; + + rt.block_on(run_job_queue_benchmarks( + pool, + conn, + &collector_config, + benchmarks, ))?; - if let Some(benchmark_job) = benchmark_job { - // TODO; process the job - println!("{benchmark_job:?}"); + Ok(0) + } + } +} + +/// Maximum number of failures before a job will be marked as failed. +const MAX_JOB_FAILS: u32 = 3; + +async fn run_job_queue_benchmarks( + pool: Pool, + mut conn: Box, + collector: &CollectorConfig, + all_compile_benchmarks: Vec, +) -> anyhow::Result<()> { + conn.update_collector_heartbeat(collector.name()).await?; + + // TODO: check collector SHA vs site SHA + while let Some((benchmark_job, artifact_id)) = conn + .dequeue_benchmark_job( + collector.name(), + collector.target(), + collector.benchmark_set(), + ) + .await? + { + log::info!("Dequeued job {benchmark_job:?}, artifact_id {artifact_id:?}"); + let result = run_benchmark_job( + conn.as_mut(), + &benchmark_job, + artifact_id.clone(), + &all_compile_benchmarks, + ) + .await; + match result { + Ok(_) => { + log::info!("Job finished sucessfully"); + conn.mark_benchmark_job_as_completed( + benchmark_job.id(), + BenchmarkJobConclusion::Success, + ) + .await?; } + Err(error) => { + match error { + BenchmarkJobError::Permanent(error) => { + log::error!("Job finished with permanent error: {error:?}"); + + // Store the error to the database + let artifact_row_id = conn.artifact_id(&artifact_id).await; + // Use a placeholder to say that the error is associated with a job, + // not with a benchmark. + conn.record_error( + artifact_row_id, + &format!("job:{}", benchmark_job.id()), + &format!("Error while benchmarking job {benchmark_job:?}: {error:?}"), + ) + .await; + + // Something bad that probably cannot be retried has happened. + // Immediately mark the job as failed and continue with other jobs + log::info!("Marking the job as failed"); + conn.mark_benchmark_job_as_completed( + benchmark_job.id(), + BenchmarkJobConclusion::Failure, + ) + .await?; + } + BenchmarkJobError::Transient(error) => { + log::error!("Job finished with transient error: {error:?}"); - Ok(0) + // There was some transient (i.e. I/O, network or database) error. + // Let's retry the job later, with some sleep + log::info!("Retrying after 30s..."); + tokio::time::sleep(Duration::from_secs(30)).await; + + // Maybe there was a DB issue. Try to reconnect to the database. + conn = pool.connection().await; + } + } + } } + + conn.update_collector_heartbeat(collector.name()).await?; + } + log::info!("No job found, exiting"); + Ok(()) +} + +/// Error that happened during benchmarking of a job. +enum BenchmarkJobError { + /// The error is non-recoverable. + /// For example, a rustc toolchain does not exist on CI + Permanent(anyhow::Error), + Transient(anyhow::Error), +} + +impl From for BenchmarkJobError { + fn from(error: anyhow::Error) -> Self { + Self::Transient(error) } } +async fn run_benchmark_job( + conn: &mut dyn Connection, + job: &BenchmarkJob, + artifact_id: ArtifactId, + all_compile_benchmarks: &[Benchmark], +) -> Result<(), BenchmarkJobError> { + // Fail the job if it has been dequeued too many times + if job.deque_count() > MAX_JOB_FAILS { + return Err(BenchmarkJobError::Permanent(anyhow::anyhow!( + "Job failed after being dequeued for {MAX_JOB_FAILS} times" + ))); + } + + log::info!("Downloading sysroot"); + let toolchain = match &artifact_id { + ArtifactId::Commit(commit) => { + let mut sysroot = match Sysroot::install( + Path::new(TOOLCHAIN_CACHE_DIRECTORY), + commit.sha.clone(), + job.target().as_str(), + &[job.backend().into()], + ) + .await + { + Ok(sysroot) => sysroot, + Err(SysrootDownloadError::SysrootShaNotFound) => { + return Err(BenchmarkJobError::Permanent(anyhow::anyhow!( + "Artifacts for SHA {} and target {} were not found on CI servers", + commit.sha, + job.target().as_str() + ))) + } + Err(SysrootDownloadError::IO(error)) => return Err(error.into()), + }; + // Avoid redownloading the same sysroot multiple times for different jobs, even + // across collector restarts. + + // TODO: Periodically clear the cache directory to avoid running out of disk space. + sysroot.preserve(); + Toolchain::from_sysroot(&sysroot, commit.sha.clone()) + } + ArtifactId::Tag(tag) => { + create_toolchain_from_published_version(tag, job.target().as_str())? + } + }; + log::info!("Sysroot download finished"); + + let (compile_config, runtime_config) = + create_benchmark_configs(conn, &toolchain, &artifact_id, job, all_compile_benchmarks) + .await + .map_err(|error| { + BenchmarkJobError::Permanent(anyhow::anyhow!( + "Cannot prepare benchmark configs: {error:?}" + )) + })?; + + let shared = SharedBenchmarkConfig { + artifact_id, + toolchain, + record_duration: false, + }; + + // A failure here means that it was not possible to compile something, that likely won't resolve + // itself automatically. + run_benchmarks(conn, shared, compile_config, runtime_config) + .await + .map_err(|error| { + BenchmarkJobError::Permanent(anyhow::anyhow!("Cannot run benchmarks: {error:?}")) + })?; + Ok(()) +} + +async fn create_benchmark_configs( + conn: &mut dyn Connection, + toolchain: &Toolchain, + artifact_id: &ArtifactId, + job: &BenchmarkJob, + all_compile_benchmarks: &[Benchmark], +) -> anyhow::Result<( + Option, + Option, +)> { + // Expand the benchmark set and figure out which benchmarks should be executed + let benchmark_set = BenchmarkSetId::new(job.target().into(), job.benchmark_set().get_id()); + let benchmark_set_members = expand_benchmark_set(benchmark_set); + log::debug!("Expanded benchmark set members: {benchmark_set_members:?}"); + + let mut bench_rustc = false; + let mut bench_runtime = false; + let mut bench_compile_benchmarks = HashSet::new(); + for member in benchmark_set_members { + match member { + BenchmarkSetMember::CompileBenchmark(benchmark) => { + bench_compile_benchmarks.insert(benchmark); + } + BenchmarkSetMember::RuntimeBenchmarks => bench_runtime = true, + BenchmarkSetMember::Rustc => bench_rustc = true, + } + } + + let compile_config = if bench_rustc || !bench_compile_benchmarks.is_empty() { + Some(CompileBenchmarkConfig { + benchmarks: all_compile_benchmarks + .iter() + .filter(|b| bench_compile_benchmarks.contains(&b.name)) + .cloned() + .collect(), + profiles: vec![job.profile().into()], + scenarios: Scenario::all(), + backends: vec![job.backend().into()], + iterations: None, + is_self_profile: true, + bench_rustc, + targets: vec![job.target().into()], + }) + } else { + None + }; + + let runtime_config = if bench_runtime { + let runtime_suite = load_runtime_benchmarks( + conn, + &runtime_benchmark_dir(), + CargoIsolationMode::Isolated, + None, + toolchain, + artifact_id, + ) + .await?; + Some(RuntimeBenchmarkConfig { + runtime_suite, + filter: RuntimeBenchmarkFilter::keep_all(), + iterations: DEFAULT_RUNTIME_ITERATIONS, + }) + } else { + None + }; + + Ok((compile_config, runtime_config)) +} + fn binary_stats_local(args: BinaryStatsLocal, symbols: bool) -> anyhow::Result<()> { let stats = ArtifactStats::from_path(&args.artifact) .with_context(|| format!("Cannot load artifact from {}", args.artifact.display()))?; @@ -1759,26 +2014,20 @@ async fn init_collection( /// Execute all benchmarks specified by the given configurations. async fn run_benchmarks( - mut connection: Box, + connection: &mut dyn Connection, shared: SharedBenchmarkConfig, compile: Option, runtime: Option, ) -> anyhow::Result<()> { - record_toolchain_sizes(connection.as_mut(), &shared.artifact_id, &shared.toolchain).await; + record_toolchain_sizes(connection, &shared.artifact_id, &shared.toolchain).await; - let collector = init_collection( - connection.as_mut(), - &shared, - compile.as_ref(), - runtime.as_ref(), - ) - .await; + let collector = init_collection(connection, &shared, compile.as_ref(), runtime.as_ref()).await; let start = Instant::now(); // Compile benchmarks let compile_result = if let Some(compile) = compile { - let errors = bench_compile(connection.as_mut(), &shared, compile, &collector).await; + let errors = bench_compile(connection, &shared, compile, &collector).await; errors .fail_if_nonzero() .context("Compile benchmarks failed") @@ -1789,7 +2038,7 @@ async fn run_benchmarks( // Runtime benchmarks let runtime_result = if let Some(runtime) = runtime { bench_runtime( - connection.as_mut(), + connection, runtime.runtime_suite, &collector, runtime.filter, @@ -1801,10 +2050,12 @@ async fn run_benchmarks( Ok(()) }; - let end = start.elapsed(); - connection - .record_duration(collector.artifact_row_id, end) - .await; + if shared.record_duration { + let end = start.elapsed(); + connection + .record_duration(collector.artifact_row_id, end) + .await; + } compile_result.and(runtime_result) } @@ -1845,9 +2096,10 @@ async fn bench_published_artifact( let shared = SharedBenchmarkConfig { artifact_id, toolchain, + record_duration: true, }; run_benchmarks( - connection, + connection.as_mut(), shared, Some(CompileBenchmarkConfig { benchmarks: compile_benchmarks, diff --git a/collector/src/compile/benchmark/mod.rs b/collector/src/compile/benchmark/mod.rs index 04b03ca07..049e66ee8 100644 --- a/collector/src/compile/benchmark/mod.rs +++ b/collector/src/compile/benchmark/mod.rs @@ -114,6 +114,7 @@ impl std::fmt::Display for BenchmarkName { } } +#[derive(Clone)] pub struct Benchmark { pub name: BenchmarkName, pub path: PathBuf, diff --git a/collector/src/compile/benchmark/profile.rs b/collector/src/compile/benchmark/profile.rs index 9e178b7d5..14b300adf 100644 --- a/collector/src/compile/benchmark/profile.rs +++ b/collector/src/compile/benchmark/profile.rs @@ -45,12 +45,25 @@ impl Profile { impl From for database::Profile { fn from(value: Profile) -> Self { match value { - Profile::Check => database::Profile::Check, - Profile::Debug => database::Profile::Debug, - Profile::Doc => database::Profile::Doc, - Profile::DocJson => database::Profile::DocJson, - Profile::Opt => database::Profile::Opt, - Profile::Clippy => database::Profile::Clippy, + Profile::Check => Self::Check, + Profile::Debug => Self::Debug, + Profile::Doc => Self::Doc, + Profile::DocJson => Self::DocJson, + Profile::Opt => Self::Opt, + Profile::Clippy => Self::Clippy, + } + } +} + +impl From for Profile { + fn from(value: database::Profile) -> Self { + match value { + database::Profile::Check => Self::Check, + database::Profile::Debug => Self::Debug, + database::Profile::Doc => Self::Doc, + database::Profile::DocJson => Self::DocJson, + database::Profile::Opt => Self::Opt, + database::Profile::Clippy => Self::Clippy, } } } diff --git a/collector/src/toolchain.rs b/collector/src/toolchain.rs index 666649614..621382f96 100644 --- a/collector/src/toolchain.rs +++ b/collector/src/toolchain.rs @@ -2,8 +2,9 @@ use crate::compile::benchmark::codegen_backend::CodegenBackend; use crate::compile::benchmark::profile::Profile; use anyhow::{anyhow, Context}; use log::debug; +use reqwest::StatusCode; use std::ffi::OsStr; -use std::fs::{self, File}; +use std::fs; use std::io::{BufReader, Read}; use std::path::{Path, PathBuf}; use std::process::Command; @@ -11,39 +12,83 @@ use std::{fmt, str}; use tar::Archive; use xz2::bufread::XzDecoder; +pub enum SysrootDownloadError { + SysrootShaNotFound, + IO(anyhow::Error), +} + +impl SysrootDownloadError { + pub fn as_anyhow_error(self) -> anyhow::Error { + match self { + SysrootDownloadError::SysrootShaNotFound => { + anyhow::anyhow!("Sysroot was not found on CI") + } + SysrootDownloadError::IO(error) => error, + } + } +} + /// Sysroot downloaded from CI. pub struct Sysroot { - pub sha: String, + sha: String, pub components: ToolchainComponents, - pub triple: String, - pub preserve: bool, + triple: String, + preserve: bool, } impl Sysroot { pub async fn install( + cache_directory: &Path, sha: String, triple: &str, backends: &[CodegenBackend], - ) -> anyhow::Result { - let unpack_into = "cache"; - - fs::create_dir_all(unpack_into)?; + ) -> Result { + let cache_directory = cache_directory.join(triple).join(&sha); + fs::create_dir_all(&cache_directory).map_err(|e| SysrootDownloadError::IO(e.into()))?; let download = SysrootDownload { - directory: unpack_into.into(), - rust_sha: sha, + cache_directory: cache_directory.clone(), + rust_sha: sha.clone(), triple: triple.to_owned(), }; - download.get_and_extract(Component::Rustc).await?; - download.get_and_extract(Component::Std).await?; - download.get_and_extract(Component::Cargo).await?; - download.get_and_extract(Component::RustSrc).await?; - if backends.contains(&CodegenBackend::Cranelift) { - download.get_and_extract(Component::Cranelift).await?; + let requires_cranelift = backends.contains(&CodegenBackend::Cranelift); + + let stamp = SysrootStamp::load_from_dir(&cache_directory); + match stamp { + Ok(stamp) => { + log::debug!("Found existing stamp for {sha}/{triple}: {stamp:?}"); + // We should already have a complete sysroot present on disk, check if we need to + // download optional components + if requires_cranelift && !stamp.cranelift { + download.get_and_extract(Component::Cranelift).await?; + } + } + Err(_) => { + log::debug!( + "No existing stamp found for {sha}/{triple}, downloading a fresh sysroot" + ); + + // There is no stamp, download everything + download.get_and_extract(Component::Rustc).await?; + download.get_and_extract(Component::Std).await?; + download.get_and_extract(Component::Cargo).await?; + download.get_and_extract(Component::RustSrc).await?; + if requires_cranelift { + download.get_and_extract(Component::Cranelift).await?; + } + } } - let sysroot = download.into_sysroot()?; + // Update the stamp + let stamp = SysrootStamp { + cranelift: requires_cranelift, + }; + stamp + .store_to_dir(&cache_directory) + .map_err(SysrootDownloadError::IO)?; + + let sysroot = download.into_sysroot().map_err(SysrootDownloadError::IO)?; Ok(sysroot) } @@ -68,9 +113,32 @@ impl Drop for Sysroot { } } +const SYSROOT_STAMP_FILENAME: &str = ".sysroot-stamp.json"; + +/// Stores a proof on disk that we have downloaded a complete sysroot. +/// It is used to avoid redownloading a sysroot if it already exists on disk. +#[derive(serde::Serialize, serde::Deserialize, Debug)] +struct SysrootStamp { + /// Was Cranelift downloaded as a part of the sysroot? + cranelift: bool, +} + +impl SysrootStamp { + fn load_from_dir(dir: &Path) -> anyhow::Result { + let data = std::fs::read(dir.join(SYSROOT_STAMP_FILENAME))?; + let stamp: SysrootStamp = serde_json::from_slice(&data)?; + Ok(stamp) + } + + fn store_to_dir(&self, dir: &Path) -> anyhow::Result<()> { + let file = std::fs::File::create(dir.join(SYSROOT_STAMP_FILENAME))?; + Ok(serde_json::to_writer(file, self)?) + } +} + #[derive(Debug, Clone)] struct SysrootDownload { - directory: PathBuf, + cache_directory: PathBuf, rust_sha: String, triple: String, } @@ -118,7 +186,7 @@ impl Component { impl SysrootDownload { fn into_sysroot(self) -> anyhow::Result { - let sysroot_bin_dir = self.directory.join(&self.rust_sha).join("bin"); + let sysroot_bin_dir = self.cache_directory.join("bin"); let sysroot_bin = |name| { let path = sysroot_bin_dir.join(name); path.canonicalize().with_context(|| { @@ -134,7 +202,7 @@ impl SysrootDownload { Some(sysroot_bin("rustdoc")?), sysroot_bin("clippy-driver").ok(), sysroot_bin("cargo")?, - &self.directory.join(&self.rust_sha).join("lib"), + &self.cache_directory.join("lib"), )?; Ok(Sysroot { @@ -145,24 +213,7 @@ impl SysrootDownload { }) } - async fn get_and_extract(&self, component: Component) -> anyhow::Result<()> { - let archive_path = self.directory.join(format!( - "{}-{}-{}.tar.xz", - self.rust_sha, self.triple, component, - )); - if archive_path.exists() { - let reader = BufReader::new(File::open(&archive_path)?); - let decompress = XzDecoder::new(reader); - let extract = self.extract(component, decompress); - match extract { - Ok(()) => return Ok(()), - Err(err) => { - log::warn!("extracting {} failed: {:?}", archive_path.display(), err); - fs::remove_file(&archive_path).context("removing archive_path")?; - } - } - } - + async fn get_and_extract(&self, component: Component) -> Result<(), SysrootDownloadError> { // We usually have nightlies but we want to avoid breaking down if we // accidentally end up with a beta or stable commit. let urls = [ @@ -170,29 +221,50 @@ impl SysrootDownload { component.url("beta", self, &self.triple), component.url("stable", self, &self.triple), ]; + + // Did we see any other error than 404? + let mut found_error_that_is_not_404 = false; for url in &urls { log::debug!("requesting: {}", url); - let resp = reqwest::get(url).await?; - log::debug!("{}", resp.status()); - if resp.status().is_success() { - let bytes: Vec = resp.bytes().await?.into(); - let reader = XzDecoder::new(BufReader::new(bytes.as_slice())); - match self.extract(component, reader) { - Ok(()) => return Ok(()), - Err(err) => { - log::warn!("extracting {} failed: {:?}", url, err); + let resp = reqwest::get(url) + .await + .map_err(|e| SysrootDownloadError::IO(e.into()))?; + log::debug!("response status: {}", resp.status()); + + match resp.status() { + s if s.is_success() => { + let bytes: Vec = resp + .bytes() + .await + .map_err(|e| SysrootDownloadError::IO(e.into()))? + .into(); + let reader = XzDecoder::new(BufReader::new(bytes.as_slice())); + match self.extract(component, reader) { + Ok(()) => return Ok(()), + Err(err) => { + log::warn!("extracting {url} failed: {err:?}"); + found_error_that_is_not_404 = true; + } } } + StatusCode::NOT_FOUND => {} + _ => { + log::error!("response body: {}", resp.text().await.unwrap_or_default()); + found_error_that_is_not_404 = true + } } } - Err(anyhow!( - "unable to download sha {} triple {} module {} from any of {:?}", - self.rust_sha, - self.triple, - component, - urls - )) + if !found_error_that_is_not_404 { + // The only errors we saw were 404, so we assume that the toolchain is simply not on CI + Err(SysrootDownloadError::SysrootShaNotFound) + } else { + Err(SysrootDownloadError::IO(anyhow!( + "unable to download sha {} triple {} module {component} from any of {urls:?}", + self.rust_sha, + self.triple, + ))) + } } fn extract(&self, component: Component, reader: T) -> anyhow::Result<()> { @@ -203,7 +275,7 @@ impl SysrootDownload { _ => component.to_string(), }; - let unpack_into = self.directory.join(&self.rust_sha); + let unpack_into = &self.cache_directory; for entry in archive.entries()? { let mut entry = entry?; @@ -617,6 +689,7 @@ fn get_lib_dir_from_rustc(rustc: &Path) -> anyhow::Result { #[cfg(test)] mod tests { use super::*; + use std::fs::File; #[test] fn fill_libraries() { diff --git a/database/src/lib.rs b/database/src/lib.rs index a9cd5a280..36ec38ecb 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -1100,6 +1100,10 @@ impl BenchmarkSet { pub fn new(id: u32) -> Self { Self(id) } + + pub fn get_id(&self) -> u32 { + self.0 + } } /// A single unit of work generated from a benchmark request. Split by profiles diff --git a/database/src/pool.rs b/database/src/pool.rs index 3053cae79..ba673489f 100644 --- a/database/src/pool.rs +++ b/database/src/pool.rs @@ -269,13 +269,16 @@ pub trait Connection: Send + Sync { async fn mark_benchmark_job_as_completed( &self, id: u32, - benchmark_job_conculsion: BenchmarkJobConclusion, + conclusion: BenchmarkJobConclusion, ) -> anyhow::Result<()>; async fn get_status_page_data(&self) -> anyhow::Result; /// Get all of the configuration for all of the collectors async fn get_collector_configs(&self) -> anyhow::Result>; + + /// Updates the last known heartbeat of a collector to the current time. + async fn update_collector_heartbeat(&self, collector_name: &str) -> anyhow::Result<()>; } #[async_trait::async_trait] @@ -1149,11 +1152,11 @@ mod tests { assert_eq!(collector_configs[0].name(), collector_name_one); assert_eq!(collector_configs[0].benchmark_set(), benchmark_set_one); - assert_eq!(collector_configs[0].is_active(), true); + assert!(collector_configs[0].is_active()); assert_eq!(collector_configs[1].name(), collector_name_two); assert_eq!(collector_configs[1].benchmark_set(), benchmark_set_two); - assert_eq!(collector_configs[1].is_active(), true); + assert!(collector_configs[1].is_active()); Ok(ctx) }) diff --git a/database/src/pool/postgres.rs b/database/src/pool/postgres.rs index 77141fe5e..3ea37270e 100644 --- a/database/src/pool/postgres.rs +++ b/database/src/pool/postgres.rs @@ -1776,7 +1776,7 @@ where benchmark_set: BenchmarkSet, ) -> anyhow::Result> { // We take the oldest job from the job_queue matching the benchmark_set, - // target and status of 'queued' + // target and status of 'queued' or 'in_progress' // If a job was dequeued, we increment its retry (dequeue) count let row_opt = self .conn() @@ -1788,11 +1788,19 @@ where FROM job_queue WHERE - status = $1 + -- Take queued or in-progress jobs + (status = $1 OR status = $5) AND target = $2 AND benchmark_set = $3 - ORDER - BY created_at + ORDER BY + -- Prefer in-progress jobs that have not been finished previously, so that + -- we can finish them. + CASE + WHEN status = $5 THEN 0 + WHEN status = $1 THEN 1 + ELSE 2 + END, + created_at LIMIT 1 FOR UPDATE SKIP LOCKED ), updated AS ( @@ -2204,6 +2212,20 @@ where Ok(configs) } + + async fn update_collector_heartbeat(&self, collector_name: &str) -> anyhow::Result<()> { + self.conn() + .query( + r#" + UPDATE collector_config + SET last_heartbeat_at = NOW() + WHERE name = $1 + "#, + &[&collector_name], + ) + .await?; + Ok(()) + } } fn row_to_benchmark_request(row: &Row) -> BenchmarkRequest { diff --git a/database/src/pool/sqlite.rs b/database/src/pool/sqlite.rs index 692069d55..08d011f46 100644 --- a/database/src/pool/sqlite.rs +++ b/database/src/pool/sqlite.rs @@ -1365,7 +1365,7 @@ impl Connection for SqliteConnection { async fn mark_benchmark_job_as_completed( &self, _id: u32, - _benchmark_job_conculsion: BenchmarkJobConclusion, + _conclusion: BenchmarkJobConclusion, ) -> anyhow::Result<()> { no_queue_implementation_abort!() } @@ -1377,6 +1377,10 @@ impl Connection for SqliteConnection { async fn get_collector_configs(&self) -> anyhow::Result> { no_queue_implementation_abort!() } + + async fn update_collector_heartbeat(&self, _collector_name: &str) -> anyhow::Result<()> { + no_queue_implementation_abort!() + } } fn parse_artifact_id(ty: &str, sha: &str, date: Option) -> ArtifactId { diff --git a/site/src/request_handlers/github.rs b/site/src/request_handlers/github.rs index 17bab3cd1..5eb76a6e4 100644 --- a/site/src/request_handlers/github.rs +++ b/site/src/request_handlers/github.rs @@ -464,7 +464,7 @@ Otherwise LGTM."#), @r#"Some(Ok(QueueCommand { params: BenchmarkParameters { include: Some("foo,bar"), exclude: None, runs: None, backends: None } }))"#); } - fn get_build_commands(body: &str) -> Vec> { + fn get_build_commands(body: &str) -> Vec, String>> { parse_build_commands(body).collect() }