Skip to content

collabse stdout and stderr delta events into one #1787

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 11 additions & 13 deletions codex-rs/core/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use crate::error::Result;
use crate::error::SandboxErr;
use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::ExecCommandStderrDeltaEvent;
use crate::protocol::ExecCommandStdoutDeltaEvent;
use crate::protocol::ExecCommandOutputDeltaEvent;
use crate::protocol::ExecOutputStream;
use crate::protocol::SandboxPolicy;
use crate::seatbelt::spawn_command_under_seatbelt;
use crate::spawn::StdioPolicy;
Expand Down Expand Up @@ -359,17 +359,15 @@ async fn read_capped<R: AsyncRead + Unpin + Send + 'static>(

if let Some(stream) = &stream {
let chunk = tmp[..n].to_vec();
let msg = if is_stderr {
EventMsg::ExecCommandStderrDelta(ExecCommandStderrDeltaEvent {
call_id: stream.call_id.clone(),
chunk: ByteBuf::from(chunk),
})
} else {
EventMsg::ExecCommandStdoutDelta(ExecCommandStdoutDeltaEvent {
call_id: stream.call_id.clone(),
chunk: ByteBuf::from(chunk),
})
};
let msg = EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent {
call_id: stream.call_id.clone(),
stream: if is_stderr {
ExecOutputStream::Stderr
} else {
ExecOutputStream::Stdout
},
chunk: ByteBuf::from(chunk),
});
let event = Event {
id: stream.sub_id.clone(),
msg,
Expand Down
23 changes: 10 additions & 13 deletions codex-rs/core/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,8 @@ pub enum EventMsg {
/// Notification that the server is about to execute a command.
ExecCommandBegin(ExecCommandBeginEvent),

/// Incremental chunk of stdout from a running command.
ExecCommandStdoutDelta(ExecCommandStdoutDeltaEvent),

/// Incremental chunk of stderr from a running command.
ExecCommandStderrDelta(ExecCommandStderrDeltaEvent),
/// Incremental chunk of output from a running command.
ExecCommandOutputDelta(ExecCommandOutputDeltaEvent),

ExecCommandEnd(ExecCommandEndEvent),

Expand Down Expand Up @@ -484,19 +481,19 @@ pub struct ExecCommandEndEvent {
}

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ExecCommandStdoutDeltaEvent {
/// Identifier for the ExecCommandBegin that produced this chunk.
pub call_id: String,
/// Raw stdout bytes (may not be valid UTF-8).
#[serde(with = "serde_bytes")]
pub chunk: ByteBuf,
#[serde(rename_all = "snake_case")]
pub enum ExecOutputStream {
Stdout,
Stderr,
}

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ExecCommandStderrDeltaEvent {
pub struct ExecCommandOutputDeltaEvent {
/// Identifier for the ExecCommandBegin that produced this chunk.
pub call_id: String,
/// Raw stderr bytes (may not be valid UTF-8).
/// Which stream produced this chunk.
pub stream: ExecOutputStream,
/// Raw bytes from the stream (may not be valid UTF-8).
#[serde(with = "serde_bytes")]
pub chunk: ByteBuf,
}
Expand Down
16 changes: 12 additions & 4 deletions codex-rs/core/tests/exec_stream_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,19 @@ use codex_core::exec::StdoutStream;
use codex_core::exec::process_exec_tool_call;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::ExecCommandStderrDeltaEvent;
use codex_core::protocol::ExecCommandStdoutDeltaEvent;
use codex_core::protocol::ExecCommandOutputDeltaEvent;
use codex_core::protocol::ExecOutputStream;
use codex_core::protocol::SandboxPolicy;
use tokio::sync::Notify;

fn collect_stdout_events(rx: Receiver<Event>) -> Vec<u8> {
let mut out = Vec::new();
while let Ok(ev) = rx.try_recv() {
if let EventMsg::ExecCommandStdoutDelta(ExecCommandStdoutDeltaEvent { chunk, .. }) = ev.msg
if let EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent {
stream: ExecOutputStream::Stdout,
chunk,
..
}) = ev.msg
{
out.extend_from_slice(&chunk);
}
Expand Down Expand Up @@ -126,7 +130,11 @@ async fn test_exec_stderr_stream_events_echo() {
// Collect only stderr delta events
let mut err = Vec::new();
while let Ok(ev) = rx.try_recv() {
if let EventMsg::ExecCommandStderrDelta(ExecCommandStderrDeltaEvent { chunk, .. }) = ev.msg
if let EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent {
stream: ExecOutputStream::Stderr,
chunk,
..
}) = ev.msg
{
err.extend_from_slice(&chunk);
}
Expand Down
3 changes: 1 addition & 2 deletions codex-rs/exec/src/event_processor_with_human_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
cwd.to_string_lossy(),
);
}
EventMsg::ExecCommandStdoutDelta(_) => {}
EventMsg::ExecCommandStderrDelta(_) => {}
EventMsg::ExecCommandOutputDelta(_) => {}
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id,
stdout,
Expand Down
3 changes: 1 addition & 2 deletions codex-rs/mcp-server/src/codex_tool_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,7 @@ async fn run_codex_tool_session_inner(
| EventMsg::McpToolCallBegin(_)
| EventMsg::McpToolCallEnd(_)
| EventMsg::ExecCommandBegin(_)
| EventMsg::ExecCommandStdoutDelta(_)
| EventMsg::ExecCommandStderrDelta(_)
| EventMsg::ExecCommandOutputDelta(_)
| EventMsg::ExecCommandEnd(_)
| EventMsg::BackgroundEvent(_)
| EventMsg::PatchApplyBegin(_)
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/tui/src/chatwidget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ impl ChatWidget<'_> {
);
self.add_to_history(HistoryCell::new_active_exec_command(command));
}
EventMsg::ExecCommandStdoutDelta(_) => {}
EventMsg::ExecCommandOutputDelta(_) => {}
EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id: _,
auto_approved,
Expand Down
Loading