Skip to content

Add job benchmark loop #2226

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
371 changes: 314 additions & 57 deletions collector/src/bin/collector.rs

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions collector/src/compile/benchmark/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ impl std::fmt::Display for BenchmarkName {
}
}

#[derive(Clone)]
pub struct Benchmark {
pub name: BenchmarkName,
pub path: PathBuf,
Expand Down
25 changes: 19 additions & 6 deletions collector/src/compile/benchmark/profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,25 @@ impl Profile {
impl From<Profile> for database::Profile {
fn from(value: Profile) -> Self {
match value {
Profile::Check => database::Profile::Check,
Profile::Debug => database::Profile::Debug,
Profile::Doc => database::Profile::Doc,
Profile::DocJson => database::Profile::DocJson,
Profile::Opt => database::Profile::Opt,
Profile::Clippy => database::Profile::Clippy,
Profile::Check => Self::Check,
Profile::Debug => Self::Debug,
Profile::Doc => Self::Doc,
Profile::DocJson => Self::DocJson,
Profile::Opt => Self::Opt,
Profile::Clippy => Self::Clippy,
}
}
}

impl From<database::Profile> for Profile {
fn from(value: database::Profile) -> Self {
match value {
database::Profile::Check => Self::Check,
database::Profile::Debug => Self::Debug,
database::Profile::Doc => Self::Doc,
database::Profile::DocJson => Self::DocJson,
database::Profile::Opt => Self::Opt,
database::Profile::Clippy => Self::Clippy,
}
}
}
178 changes: 124 additions & 54 deletions collector/src/toolchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,93 @@ use crate::compile::benchmark::codegen_backend::CodegenBackend;
use crate::compile::benchmark::profile::Profile;
use anyhow::{anyhow, Context};
use log::debug;
use reqwest::StatusCode;
use std::ffi::OsStr;
use std::fs::{self, File};
use std::fs;
use std::io::{BufReader, Read};
use std::path::{Path, PathBuf};
use std::process::Command;
use std::{fmt, str};
use tar::Archive;
use xz2::bufread::XzDecoder;

pub enum SysrootDownloadError {
SysrootShaNotFound,
IO(anyhow::Error),
}

impl SysrootDownloadError {
pub fn as_anyhow_error(self) -> anyhow::Error {
match self {
SysrootDownloadError::SysrootShaNotFound => {
anyhow::anyhow!("Sysroot was not found on CI")
}
SysrootDownloadError::IO(error) => error,
}
}
}

/// Sysroot downloaded from CI.
pub struct Sysroot {
pub sha: String,
sha: String,
pub components: ToolchainComponents,
pub triple: String,
pub preserve: bool,
triple: String,
preserve: bool,
}

impl Sysroot {
pub async fn install(
cache_directory: &Path,
sha: String,
triple: &str,
backends: &[CodegenBackend],
) -> anyhow::Result<Self> {
let unpack_into = "cache";

fs::create_dir_all(unpack_into)?;
) -> Result<Self, SysrootDownloadError> {
let cache_directory = cache_directory.join(triple).join(&sha);
fs::create_dir_all(&cache_directory).map_err(|e| SysrootDownloadError::IO(e.into()))?;

let download = SysrootDownload {
directory: unpack_into.into(),
rust_sha: sha,
cache_directory: cache_directory.clone(),
rust_sha: sha.clone(),
triple: triple.to_owned(),
};

download.get_and_extract(Component::Rustc).await?;
download.get_and_extract(Component::Std).await?;
download.get_and_extract(Component::Cargo).await?;
download.get_and_extract(Component::RustSrc).await?;
if backends.contains(&CodegenBackend::Cranelift) {
download.get_and_extract(Component::Cranelift).await?;
let requires_cranelift = backends.contains(&CodegenBackend::Cranelift);

let stamp = SysrootStamp::load_from_dir(&cache_directory);
match stamp {
Ok(stamp) => {
log::debug!("Found existing stamp for {sha}/{triple}: {stamp:?}");
// We should already have a complete sysroot present on disk, check if we need to
// download optional components
if requires_cranelift && !stamp.cranelift {
download.get_and_extract(Component::Cranelift).await?;
}
}
Err(_) => {
log::debug!(
"No existing stamp found for {sha}/{triple}, downloading a fresh sysroot"
);

// There is no stamp, download everything
download.get_and_extract(Component::Rustc).await?;
download.get_and_extract(Component::Std).await?;
download.get_and_extract(Component::Cargo).await?;
download.get_and_extract(Component::RustSrc).await?;
if requires_cranelift {
download.get_and_extract(Component::Cranelift).await?;
}
}
}

let sysroot = download.into_sysroot()?;
// Update the stamp
let stamp = SysrootStamp {
cranelift: requires_cranelift,
};
stamp
.store_to_dir(&cache_directory)
.map_err(SysrootDownloadError::IO)?;

let sysroot = download.into_sysroot().map_err(SysrootDownloadError::IO)?;

Ok(sysroot)
}
Expand All @@ -68,9 +113,32 @@ impl Drop for Sysroot {
}
}

const SYSROOT_STAMP_FILENAME: &str = ".sysroot-stamp.json";

/// Stores a proof on disk that we have downloaded a complete sysroot.
/// It is used to avoid redownloading a sysroot if it already exists on disk.
#[derive(serde::Serialize, serde::Deserialize, Debug)]
struct SysrootStamp {
/// Was Cranelift downloaded as a part of the sysroot?
cranelift: bool,
}

impl SysrootStamp {
fn load_from_dir(dir: &Path) -> anyhow::Result<Self> {
let data = std::fs::read(dir.join(SYSROOT_STAMP_FILENAME))?;
let stamp: SysrootStamp = serde_json::from_slice(&data)?;
Ok(stamp)
}

fn store_to_dir(&self, dir: &Path) -> anyhow::Result<()> {
let file = std::fs::File::create(dir.join(SYSROOT_STAMP_FILENAME))?;
Ok(serde_json::to_writer(file, self)?)
}
}

#[derive(Debug, Clone)]
struct SysrootDownload {
directory: PathBuf,
cache_directory: PathBuf,
rust_sha: String,
triple: String,
}
Expand Down Expand Up @@ -118,7 +186,7 @@ impl Component {

impl SysrootDownload {
fn into_sysroot(self) -> anyhow::Result<Sysroot> {
let sysroot_bin_dir = self.directory.join(&self.rust_sha).join("bin");
let sysroot_bin_dir = self.cache_directory.join("bin");
let sysroot_bin = |name| {
let path = sysroot_bin_dir.join(name);
path.canonicalize().with_context(|| {
Expand All @@ -134,7 +202,7 @@ impl SysrootDownload {
Some(sysroot_bin("rustdoc")?),
sysroot_bin("clippy-driver").ok(),
sysroot_bin("cargo")?,
&self.directory.join(&self.rust_sha).join("lib"),
&self.cache_directory.join("lib"),
)?;

Ok(Sysroot {
Expand All @@ -145,54 +213,55 @@ impl SysrootDownload {
})
}

async fn get_and_extract(&self, component: Component) -> anyhow::Result<()> {
let archive_path = self.directory.join(format!(
"{}-{}-{}.tar.xz",
self.rust_sha, self.triple, component,
));
if archive_path.exists() {
let reader = BufReader::new(File::open(&archive_path)?);
let decompress = XzDecoder::new(reader);
let extract = self.extract(component, decompress);
match extract {
Ok(()) => return Ok(()),
Err(err) => {
log::warn!("extracting {} failed: {:?}", archive_path.display(), err);
fs::remove_file(&archive_path).context("removing archive_path")?;
}
}
}

async fn get_and_extract(&self, component: Component) -> Result<(), SysrootDownloadError> {
// We usually have nightlies but we want to avoid breaking down if we
// accidentally end up with a beta or stable commit.
let urls = [
component.url("nightly", self, &self.triple),
component.url("beta", self, &self.triple),
component.url("stable", self, &self.triple),
];

// Did we see any other error than 404?
let mut found_error_that_is_not_404 = false;
for url in &urls {
log::debug!("requesting: {}", url);
let resp = reqwest::get(url).await?;
let resp = reqwest::get(url)
.await
.map_err(|e| SysrootDownloadError::IO(e.into()))?;
log::debug!("{}", resp.status());
if resp.status().is_success() {
let bytes: Vec<u8> = resp.bytes().await?.into();
let reader = XzDecoder::new(BufReader::new(bytes.as_slice()));
match self.extract(component, reader) {
Ok(()) => return Ok(()),
Err(err) => {
log::warn!("extracting {} failed: {:?}", url, err);

match resp.status() {
s if s.is_success() => {
let bytes: Vec<u8> = resp
.bytes()
.await
.map_err(|e| SysrootDownloadError::IO(e.into()))?
.into();
let reader = XzDecoder::new(BufReader::new(bytes.as_slice()));
match self.extract(component, reader) {
Ok(()) => return Ok(()),
Err(err) => {
log::warn!("extracting {url} failed: {err:?}");
found_error_that_is_not_404 = true;
}
}
}
StatusCode::NOT_FOUND => {}
_ => found_error_that_is_not_404 = true,
}
}

Err(anyhow!(
"unable to download sha {} triple {} module {} from any of {:?}",
self.rust_sha,
self.triple,
component,
urls
))
if !found_error_that_is_not_404 {
// The only errors we saw were 404, so we assume that the toolchain is simply not on CI
Err(SysrootDownloadError::SysrootShaNotFound)
} else {
Err(SysrootDownloadError::IO(anyhow!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use resp.error_for_status() so we can get the actual error message too from the response which could be useful for debugging; https://docs.rs/reqwest/latest/reqwest/struct.Response.html#method.error_for_status

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error_for_status() is nice, but the reason why we don't provide more context here is simply because we have up to three (potentially different) errors. Now that we detect 404s explicitly, we could just bail out on the first non-404 error, but I'm a bit worried about backwards compatibility, I don't know if "toolchain not found" is always reported with a 404.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For logging purposes, it would be useful to distinguish between a 500 or other HTTP status codes, since they could indicate different underlying issues. At present, these are all reported as SysrootDownloadError::SysrootShaNotFound, which is overly specific and potentially misleading given the range of errors that could occur

Copy link
Member Author

@Kobzol Kobzol Aug 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I follow. SysrootShaNotFound is returned if we see only 404 errors. If we ever see a different error than 404, we won't return SysrootShaNotFound.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can invert it;

if is_404 {
    // The only errors we saw were 404, so we assume that the toolchain is simply not on CI
    Err(SysrootDownloadError::SysrootShaNotFound)
} else {
    Err(SysrootDownloadError::IO(anyhow!("unable to download sha {} triple {} module {component} from any of {urls:?} error: {:?}",
         self.rust_sha,
         self.triple,
         resp.error_for_status() // <- so we can see what the error is
    ))
}

I still think it is valuable to be able to see what the error is for the HTTP request, either through logging or making it part of the error so we don't lose information that could be useful.

"unable to download sha {} triple {} module {component} from any of {urls:?}",
self.rust_sha,
self.triple,
)))
}
}

fn extract<T: Read>(&self, component: Component, reader: T) -> anyhow::Result<()> {
Expand All @@ -203,7 +272,7 @@ impl SysrootDownload {
_ => component.to_string(),
};

let unpack_into = self.directory.join(&self.rust_sha);
let unpack_into = &self.cache_directory;

for entry in archive.entries()? {
let mut entry = entry?;
Expand Down Expand Up @@ -617,6 +686,7 @@ fn get_lib_dir_from_rustc(rustc: &Path) -> anyhow::Result<PathBuf> {
#[cfg(test)]
mod tests {
use super::*;
use std::fs::File;

#[test]
fn fill_libraries() {
Expand Down
4 changes: 4 additions & 0 deletions database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,10 @@ impl BenchmarkSet {
pub fn new(id: u32) -> Self {
Self(id)
}

pub fn get_id(&self) -> u32 {
self.0
}
}

/// A single unit of work generated from a benchmark request. Split by profiles
Expand Down
5 changes: 4 additions & 1 deletion database/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,13 +269,16 @@ pub trait Connection: Send + Sync {
async fn mark_benchmark_job_as_completed(
&self,
id: u32,
benchmark_job_conculsion: BenchmarkJobConclusion,
conclusion: BenchmarkJobConclusion,
) -> anyhow::Result<()>;

async fn get_status_page_data(&self) -> anyhow::Result<PartialStatusPageData>;

/// Get all of the configuration for all of the collectors
async fn get_collector_configs(&self) -> anyhow::Result<Vec<CollectorConfig>>;

/// Updates the last known heartbeat of a collector to the current time.
async fn update_collector_heartbeat(&self, collector_name: &str) -> anyhow::Result<()>;
}

#[async_trait::async_trait]
Expand Down
30 changes: 26 additions & 4 deletions database/src/pool/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1776,7 +1776,7 @@ where
benchmark_set: BenchmarkSet,
) -> anyhow::Result<Option<(BenchmarkJob, ArtifactId)>> {
// We take the oldest job from the job_queue matching the benchmark_set,
// target and status of 'queued'
// target and status of 'queued' or 'in_progress'
// If a job was dequeued, we increment its retry (dequeue) count
let row_opt = self
.conn()
Expand All @@ -1788,11 +1788,19 @@ where
FROM
job_queue
WHERE
status = $1
-- Take queued or in-progress jobs
(status = $1 OR status = $5)
AND target = $2
AND benchmark_set = $3
ORDER
BY created_at
ORDER BY
-- Prefer in-progress jobs that have not been finished previously, so that
-- we can finish them.
CASE
WHEN status = $5 THEN 0
WHEN status = $1 THEN 1
ELSE 2
END,
created_at
LIMIT 1
FOR UPDATE SKIP LOCKED
), updated AS (
Expand Down Expand Up @@ -2204,6 +2212,20 @@ where

Ok(configs)
}

async fn update_collector_heartbeat(&self, collector_name: &str) -> anyhow::Result<()> {
self.conn()
.query(
r#"
UPDATE collector_config
SET last_heartbeat_at = NOW()
WHERE name = $1
"#,
&[&collector_name],
)
.await?;
Ok(())
}
}

fn row_to_benchmark_request(row: &Row) -> BenchmarkRequest {
Expand Down
Loading
Loading