proc_macro: use crossbeam channels for the proc_macro cross-thread bridge

This is done by having the crossbeam dependency inserted into the
proc_macro server code from the server side, to avoid adding a
dependency to proc_macro.

In addition, this introduces a -Z command-line option which will switch
rustc to run proc-macros using this cross-thread executor. With the
changes to the bridge in #98186, #98187, #98188 and #98189, the
performance of the executor should be much closer to same-thread
execution.

In local testing, the crossbeam executor was substantially more
performant than either of the two existing CrossThread strategies, so
they have been removed to keep things simple.
This commit is contained in:
Nika Layzell 2022-06-18 14:15:03 -04:00
parent 2f847b81a0
commit 6d1650fe45
8 changed files with 133 additions and 80 deletions

View file

@ -3885,6 +3885,7 @@ dependencies = [
name = "rustc_expand"
version = "0.0.0"
dependencies = [
"crossbeam-channel",
"rustc_ast",
"rustc_ast_passes",
"rustc_ast_pretty",

View file

@ -24,3 +24,4 @@ rustc_parse = { path = "../rustc_parse" }
rustc_session = { path = "../rustc_session" }
smallvec = { version = "1.8.1", features = ["union", "may_dangle"] }
rustc_ast = { path = "../rustc_ast" }
crossbeam-channel = "0.5.0"

View file

@ -8,10 +8,37 @@ use rustc_ast::tokenstream::{TokenStream, TokenTree};
use rustc_data_structures::sync::Lrc;
use rustc_errors::ErrorGuaranteed;
use rustc_parse::parser::ForceCollect;
use rustc_session::config::ProcMacroExecutionStrategy;
use rustc_span::profiling::SpannedEventArgRecorder;
use rustc_span::{Span, DUMMY_SP};
const EXEC_STRATEGY: pm::bridge::server::SameThread = pm::bridge::server::SameThread;
struct CrossbeamMessagePipe<T> {
tx: crossbeam_channel::Sender<T>,
rx: crossbeam_channel::Receiver<T>,
}
impl<T> pm::bridge::server::MessagePipe<T> for CrossbeamMessagePipe<T> {
fn new() -> (Self, Self) {
let (tx1, rx1) = crossbeam_channel::bounded(1);
let (tx2, rx2) = crossbeam_channel::bounded(1);
(CrossbeamMessagePipe { tx: tx1, rx: rx2 }, CrossbeamMessagePipe { tx: tx2, rx: rx1 })
}
fn send(&mut self, value: T) {
self.tx.send(value).unwrap();
}
fn recv(&mut self) -> Option<T> {
self.rx.recv().ok()
}
}
fn exec_strategy(ecx: &ExtCtxt<'_>) -> impl pm::bridge::server::ExecutionStrategy {
pm::bridge::server::MaybeCrossThread::<CrossbeamMessagePipe<_>>::new(
ecx.sess.opts.unstable_opts.proc_macro_execution_strategy
== ProcMacroExecutionStrategy::CrossThread,
)
}
pub struct BangProcMacro {
pub client: pm::bridge::client::Client<pm::TokenStream, pm::TokenStream>,
@ -30,8 +57,9 @@ impl base::BangProcMacro for BangProcMacro {
});
let proc_macro_backtrace = ecx.ecfg.proc_macro_backtrace;
let strategy = exec_strategy(ecx);
let server = proc_macro_server::Rustc::new(ecx);
self.client.run(&EXEC_STRATEGY, server, input, proc_macro_backtrace).map_err(|e| {
self.client.run(&strategy, server, input, proc_macro_backtrace).map_err(|e| {
let mut err = ecx.struct_span_err(span, "proc macro panicked");
if let Some(s) = e.as_str() {
err.help(&format!("message: {}", s));
@ -59,16 +87,17 @@ impl base::AttrProcMacro for AttrProcMacro {
});
let proc_macro_backtrace = ecx.ecfg.proc_macro_backtrace;
let strategy = exec_strategy(ecx);
let server = proc_macro_server::Rustc::new(ecx);
self.client
.run(&EXEC_STRATEGY, server, annotation, annotated, proc_macro_backtrace)
.map_err(|e| {
self.client.run(&strategy, server, annotation, annotated, proc_macro_backtrace).map_err(
|e| {
let mut err = ecx.struct_span_err(span, "custom attribute panicked");
if let Some(s) = e.as_str() {
err.help(&format!("message: {}", s));
}
err.emit()
})
},
)
}
}
@ -105,8 +134,9 @@ impl MultiItemModifier for DeriveProcMacro {
recorder.record_arg_with_span(ecx.expansion_descr(), span);
});
let proc_macro_backtrace = ecx.ecfg.proc_macro_backtrace;
let strategy = exec_strategy(ecx);
let server = proc_macro_server::Rustc::new(ecx);
match self.client.run(&EXEC_STRATEGY, server, input, proc_macro_backtrace) {
match self.client.run(&strategy, server, input, proc_macro_backtrace) {
Ok(stream) => stream,
Err(e) => {
let mut err = ecx.struct_span_err(span, "proc-macro derive panicked");

View file

@ -11,7 +11,7 @@ use rustc_session::config::{
};
use rustc_session::config::{
BranchProtection, Externs, OomStrategy, OutputType, OutputTypes, PAuthKey, PacRet,
SymbolManglingVersion, WasiExecModel,
ProcMacroExecutionStrategy, SymbolManglingVersion, WasiExecModel,
};
use rustc_session::config::{CFGuard, ExternEntry, LinkerPluginLto, LtoCli, SwitchWithOptPath};
use rustc_session::lint::Level;
@ -685,6 +685,7 @@ fn test_unstable_options_tracking_hash() {
untracked!(print_mono_items, Some(String::from("abc")));
untracked!(print_type_sizes, true);
untracked!(proc_macro_backtrace, true);
untracked!(proc_macro_execution_strategy, ProcMacroExecutionStrategy::CrossThread);
untracked!(query_dep_graph, true);
untracked!(save_analysis, true);
untracked!(self_profile, SwitchWithOptPath::Enabled(None));

View file

@ -2959,3 +2959,13 @@ impl OomStrategy {
}
}
}
/// How to run proc-macro code when building this crate
#[derive(Clone, Copy, PartialEq, Hash, Debug)]
pub enum ProcMacroExecutionStrategy {
/// Run the proc-macro code on the same thread as the server.
SameThread,
/// Run the proc-macro code on a different thread.
CrossThread,
}

View file

@ -415,6 +415,8 @@ mod desc {
"one of (`none` (default), `basic`, `strong`, or `all`)";
pub const parse_branch_protection: &str =
"a `,` separated combination of `bti`, `b-key`, `pac-ret`, or `leaf`";
pub const parse_proc_macro_execution_strategy: &str =
"one of supported execution strategies (`same-thread`, or `cross-thread`)";
}
mod parse {
@ -1062,6 +1064,18 @@ mod parse {
}
true
}
pub(crate) fn parse_proc_macro_execution_strategy(
slot: &mut ProcMacroExecutionStrategy,
v: Option<&str>,
) -> bool {
*slot = match v {
Some("same-thread") => ProcMacroExecutionStrategy::SameThread,
Some("cross-thread") => ProcMacroExecutionStrategy::CrossThread,
_ => return false,
};
true
}
}
options! {
@ -1457,6 +1471,9 @@ options! {
"print layout information for each type encountered (default: no)"),
proc_macro_backtrace: bool = (false, parse_bool, [UNTRACKED],
"show backtraces for panics during proc-macro execution (default: no)"),
proc_macro_execution_strategy: ProcMacroExecutionStrategy = (ProcMacroExecutionStrategy::SameThread,
parse_proc_macro_execution_strategy, [UNTRACKED],
"how to run proc-macro code (default: same-thread)"),
profile: bool = (false, parse_bool, [TRACKED],
"insert profiling code (default: no)"),
profile_closures: bool = (false, parse_no_flag, [UNTRACKED],

View file

@ -2,6 +2,8 @@
use super::*;
use std::marker::PhantomData;
// FIXME(eddyb) generate the definition of `HandleStore` in `server.rs`.
use super::client::HandleStore;
@ -143,6 +145,41 @@ pub trait ExecutionStrategy {
) -> Buffer;
}
pub struct MaybeCrossThread<P> {
cross_thread: bool,
marker: PhantomData<P>,
}
impl<P> MaybeCrossThread<P> {
pub const fn new(cross_thread: bool) -> Self {
MaybeCrossThread { cross_thread, marker: PhantomData }
}
}
impl<P> ExecutionStrategy for MaybeCrossThread<P>
where
P: MessagePipe<Buffer> + Send + 'static,
{
fn run_bridge_and_client(
&self,
dispatcher: &mut impl DispatcherTrait,
input: Buffer,
run_client: extern "C" fn(BridgeConfig<'_>) -> Buffer,
force_show_panics: bool,
) -> Buffer {
if self.cross_thread {
<CrossThread<P>>::new().run_bridge_and_client(
dispatcher,
input,
run_client,
force_show_panics,
)
} else {
SameThread.run_bridge_and_client(dispatcher, input, run_client, force_show_panics)
}
}
}
pub struct SameThread;
impl ExecutionStrategy for SameThread {
@ -164,12 +201,18 @@ impl ExecutionStrategy for SameThread {
}
}
// NOTE(eddyb) Two implementations are provided, the second one is a bit
// faster but neither is anywhere near as fast as same-thread execution.
pub struct CrossThread<P>(PhantomData<P>);
pub struct CrossThread1;
impl<P> CrossThread<P> {
pub const fn new() -> Self {
CrossThread(PhantomData)
}
}
impl ExecutionStrategy for CrossThread1 {
impl<P> ExecutionStrategy for CrossThread<P>
where
P: MessagePipe<Buffer> + Send + 'static,
{
fn run_bridge_and_client(
&self,
dispatcher: &mut impl DispatcherTrait,
@ -177,15 +220,12 @@ impl ExecutionStrategy for CrossThread1 {
run_client: extern "C" fn(BridgeConfig<'_>) -> Buffer,
force_show_panics: bool,
) -> Buffer {
use std::sync::mpsc::channel;
let (req_tx, req_rx) = channel();
let (res_tx, res_rx) = channel();
let (mut server, mut client) = P::new();
let join_handle = thread::spawn(move || {
let mut dispatch = |buf| {
req_tx.send(buf).unwrap();
res_rx.recv().unwrap()
let mut dispatch = |b: Buffer| -> Buffer {
client.send(b);
client.recv().expect("server died while client waiting for reply")
};
run_client(BridgeConfig {
@ -196,75 +236,27 @@ impl ExecutionStrategy for CrossThread1 {
})
});
for b in req_rx {
res_tx.send(dispatcher.dispatch(b)).unwrap();
while let Some(b) = server.recv() {
server.send(dispatcher.dispatch(b));
}
join_handle.join().unwrap()
}
}
pub struct CrossThread2;
/// A message pipe used for communicating between server and client threads.
pub trait MessagePipe<T>: Sized {
/// Create a new pair of endpoints for the message pipe.
fn new() -> (Self, Self);
impl ExecutionStrategy for CrossThread2 {
fn run_bridge_and_client(
&self,
dispatcher: &mut impl DispatcherTrait,
input: Buffer,
run_client: extern "C" fn(BridgeConfig<'_>) -> Buffer,
force_show_panics: bool,
) -> Buffer {
use std::sync::{Arc, Mutex};
/// Send a message to the other endpoint of this pipe.
fn send(&mut self, value: T);
enum State<T> {
Req(T),
Res(T),
}
let mut state = Arc::new(Mutex::new(State::Res(Buffer::new())));
let server_thread = thread::current();
let state2 = state.clone();
let join_handle = thread::spawn(move || {
let mut dispatch = |b| {
*state2.lock().unwrap() = State::Req(b);
server_thread.unpark();
loop {
thread::park();
if let State::Res(b) = &mut *state2.lock().unwrap() {
break b.take();
}
}
};
let r = run_client(BridgeConfig {
input,
dispatch: (&mut dispatch).into(),
force_show_panics,
_marker: marker::PhantomData,
});
// Wake up the server so it can exit the dispatch loop.
drop(state2);
server_thread.unpark();
r
});
// Check whether `state2` was dropped, to know when to stop.
while Arc::get_mut(&mut state).is_none() {
thread::park();
let mut b = match &mut *state.lock().unwrap() {
State::Req(b) => b.take(),
_ => continue,
};
b = dispatcher.dispatch(b.take());
*state.lock().unwrap() = State::Res(b);
join_handle.thread().unpark();
}
join_handle.join().unwrap()
}
/// Receive a message from the other endpoint of this pipe.
///
/// Returns `None` if the other end of the pipe has been destroyed, and no
/// message was received.
fn recv(&mut self) -> Option<T>;
}
fn run_server<

View file

@ -114,6 +114,7 @@
-Z print-mono-items=val -- print the result of the monomorphization collection pass
-Z print-type-sizes=val -- print layout information for each type encountered (default: no)
-Z proc-macro-backtrace=val -- show backtraces for panics during proc-macro execution (default: no)
-Z proc-macro-execution-strategy=val -- how to run proc-macro code (default: same-thread)
-Z profile=val -- insert profiling code (default: no)
-Z profile-closures=val -- profile size of closures
-Z profile-emit=val -- file path to emit profiling data at runtime when using 'profile' (default based on relative source path)