Skip to content

Commit a0cc410

Browse files
committed
oneshot channel first commit
1 parent 32cd911 commit a0cc410

File tree

2 files changed

+234
-0
lines changed

2 files changed

+234
-0
lines changed

library/std/src/sync/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,8 @@ pub use self::poison::{MappedMutexGuard, MappedRwLockReadGuard, MappedRwLockWrit
224224
#[unstable(feature = "mpmc_channel", issue = "126840")]
225225
pub mod mpmc;
226226
pub mod mpsc;
227+
#[unstable(feature = "oneshot_channel", issue = "143674")]
228+
pub mod oneshot;
227229

228230
#[unstable(feature = "sync_poison_mod", issue = "134646")]
229231
pub mod poison;

library/std/src/sync/oneshot.rs

Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
//! A single-producer, single-consumer (oneshot) channel.
2+
//!
3+
//! TODO more docs.
4+
5+
use crate::sync::mpmc;
6+
use crate::sync::mpsc::{RecvError, SendError};
7+
use crate::time::{Duration, Instant};
8+
use crate::{error, fmt};
9+
10+
/// Creates a new oneshot channel, returning the sender/receiver halves.
11+
///
12+
/// TODO more docs.
13+
#[must_use]
14+
#[unstable(feature = "oneshot_channel", issue = "143674")]
15+
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
16+
let (tx, rx) = mpmc::channel();
17+
(Sender { inner: tx }, Receiver { inner: rx })
18+
}
19+
20+
////////////////////////////////////////////////////////////////////////////////////////////////////
21+
// Sender
22+
////////////////////////////////////////////////////////////////////////////////////////////////////
23+
24+
/// The sending half of a oneshot channel.
25+
///
26+
/// TODO more docs.
27+
#[unstable(feature = "oneshot_channel", issue = "143674")]
28+
pub struct Sender<T> {
29+
/// The `oneshot` channel is simply a wrapper around a `mpmc` channel.
30+
inner: mpmc::Sender<T>,
31+
}
32+
33+
/// The sending end of the channel can be sent between threads, as long as it is not used to
34+
/// receive non-sendable things.
35+
#[unstable(feature = "oneshot_channel", issue = "143674")]
36+
unsafe impl<T: Send> Send for Sender<T> {}
37+
38+
/// FIXME: Try to boil down <https://github.com/rust-lang/rust/pull/111087> into a doc comment.
39+
#[unstable(feature = "oneshot_channel", issue = "143674")]
40+
unsafe impl<T: Send> Sync for Sender<T> {}
41+
42+
impl<T> Sender<T> {
43+
/// Attempts to send a value through this channel. This can only fail if the corresponding
44+
/// `Receiver<T>` has been dropped.
45+
///
46+
/// This method is non-blocking (wait-free).
47+
#[unstable(feature = "oneshot_channel", issue = "143674")]
48+
pub fn send(self, t: T) -> Result<(), SendError<T>> {
49+
self.inner.send(t)
50+
}
51+
}
52+
53+
#[unstable(feature = "oneshot_channel", issue = "143674")]
54+
impl<T> fmt::Debug for Sender<T> {
55+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56+
f.pad("Sender { .. }")
57+
}
58+
}
59+
60+
////////////////////////////////////////////////////////////////////////////////////////////////////
61+
// Receiver
62+
////////////////////////////////////////////////////////////////////////////////////////////////////
63+
64+
/// The receiving half of a oneshot channel.
65+
///
66+
/// TODO more docs.
67+
#[unstable(feature = "oneshot_channel", issue = "143674")]
68+
pub struct Receiver<T> {
69+
/// The `oneshot` channel is simply a wrapper around a `mpmc` channel.
70+
inner: mpmc::Receiver<T>,
71+
}
72+
73+
/// The receiving end of the channel can be sent between threads, as long as it is not used to
74+
/// receive non-sendable things.
75+
#[unstable(feature = "oneshot_channel", issue = "143674")]
76+
unsafe impl<T: Send> Send for Receiver<T> {}
77+
78+
/// FIXME: Why is `mpsc::Receiver` !Sync but `mpmc::Receiver` is? Write this in a doc comment.
79+
#[unstable(feature = "oneshot_channel", issue = "143674")]
80+
impl<T> !Sync for Receiver<T> {}
81+
82+
impl<T> Receiver<T> {
83+
/// Receives the value from the sending end, blocking the calling thread until it gets it.
84+
///
85+
/// Can only fail if the corresponding `Sender<T>` has been dropped.
86+
#[unstable(feature = "oneshot_channel", issue = "143674")]
87+
pub fn recv(self) -> Result<T, RecvError> {
88+
self.inner.recv()
89+
}
90+
91+
// Fallable methods.
92+
93+
/// Attempts to return a pending value on this receiver without blocking.
94+
///
95+
/// TODO examples.
96+
#[unstable(feature = "oneshot_channel", issue = "143674")]
97+
pub fn try_recv(self) -> Result<T, TryRecvError<T>> {
98+
self.inner.try_recv().map_err(|err| match err {
99+
mpmc::TryRecvError::Empty => TryRecvError::Empty(self),
100+
mpmc::TryRecvError::Disconnected => TryRecvError::Disconnected,
101+
})
102+
}
103+
104+
/// Attempts to wait for a value on this receiver, returning an error if the corresponding
105+
/// [`Sender`] half of this channel has been dropped, or if it waits more than `timeout`.
106+
///
107+
/// TODO examples.
108+
#[unstable(feature = "oneshot_channel", issue = "143674")]
109+
pub fn recv_timeout(self, timeout: Duration) -> Result<T, RecvTimeoutError<T>> {
110+
self.inner.recv_timeout(timeout).map_err(|err| match err {
111+
mpmc::RecvTimeoutError::Timeout => RecvTimeoutError::Timeout(self),
112+
mpmc::RecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
113+
})
114+
}
115+
116+
/// Attempts to wait for a value on this receiver, returning an error if the corresponding
117+
/// [`Sender`] half of this channel has been dropped, or if `deadline` is reached.
118+
///
119+
/// TODO examples.
120+
#[unstable(feature = "oneshot_channel", issue = "143674")]
121+
pub fn recv_deadline(self, deadline: Instant) -> Result<T, RecvTimeoutError<T>> {
122+
self.inner.recv_deadline(deadline).map_err(|err| match err {
123+
mpmc::RecvTimeoutError::Timeout => RecvTimeoutError::Timeout(self),
124+
mpmc::RecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
125+
})
126+
}
127+
}
128+
129+
#[unstable(feature = "oneshot_channel", issue = "143674")]
130+
impl<T> fmt::Debug for Receiver<T> {
131+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132+
f.pad("Receiver { .. }")
133+
}
134+
}
135+
136+
////////////////////////////////////////////////////////////////////////////////////////////////////
137+
// Receiver Errors
138+
////////////////////////////////////////////////////////////////////////////////////////////////////
139+
140+
/// An error returned from the [`try_recv`](Receiver::try_recv) method.
141+
#[unstable(feature = "oneshot_channel", issue = "143674")]
142+
pub enum TryRecvError<T> {
143+
/// The [`Sender`] has not sent a message yet, but it might in the future (as it has not yet
144+
/// disconnected). This variant contains the [`Receiver`] that [`try_recv`](Receiver::try_recv)
145+
/// took ownership over.
146+
Empty(Receiver<T>),
147+
/// The corresponding [`Sender`] half of this channel has become disconnected, and there will
148+
/// never be any more data sent over the channel.
149+
Disconnected,
150+
}
151+
152+
/// An error returned from the [`recv_timeout`](Receiver::recv_timeout) or
153+
/// [`recv_deadline`](Receiver::recv_deadline) methods.
154+
#[unstable(feature = "oneshot_channel", issue = "143674")]
155+
pub enum RecvTimeoutError<T> {
156+
/// The [`Sender`] has not sent a message yet, but it might in the future (as it has not yet
157+
/// disconnected). This variant contains the [`Receiver`] that either
158+
/// [`recv_timeout`](Receiver::recv_timeout) or [`recv_deadline`](Receiver::recv_deadline) took
159+
/// ownership over.
160+
Timeout(Receiver<T>),
161+
/// The corresponding [`Sender`] half of this channel has become disconnected, and there will
162+
/// never be any more data sent over the channel.
163+
Disconnected,
164+
}
165+
166+
#[unstable(feature = "oneshot_channel", issue = "143674")]
167+
impl<T> fmt::Debug for TryRecvError<T> {
168+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
169+
"TryRecvError(..)".fmt(f)
170+
}
171+
}
172+
173+
#[unstable(feature = "oneshot_channel", issue = "143674")]
174+
impl<T> fmt::Display for TryRecvError<T> {
175+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176+
match *self {
177+
TryRecvError::Empty(..) => "receiving on an empty channel".fmt(f),
178+
TryRecvError::Disconnected => "receiving on a closed channel".fmt(f),
179+
}
180+
}
181+
}
182+
183+
#[unstable(feature = "oneshot_channel", issue = "143674")]
184+
impl<T> error::Error for TryRecvError<T> {}
185+
186+
#[unstable(feature = "oneshot_channel", issue = "143674")]
187+
impl<T> From<RecvError> for TryRecvError<T> {
188+
/// Converts a `RecvError` into a `TryRecvError`.
189+
///
190+
/// This conversion always returns `TryRecvError::Disconnected`.
191+
///
192+
/// No data is allocated on the heap.
193+
fn from(err: RecvError) -> TryRecvError<T> {
194+
match err {
195+
RecvError => TryRecvError::Disconnected,
196+
}
197+
}
198+
}
199+
200+
#[unstable(feature = "oneshot_channel", issue = "143674")]
201+
impl<T> fmt::Debug for RecvTimeoutError<T> {
202+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
203+
"RecvTimeoutError(..)".fmt(f)
204+
}
205+
}
206+
207+
#[unstable(feature = "oneshot_channel", issue = "143674")]
208+
impl<T> fmt::Display for RecvTimeoutError<T> {
209+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210+
match *self {
211+
RecvTimeoutError::Timeout(..) => "timed out waiting on channel".fmt(f),
212+
RecvTimeoutError::Disconnected => "receiving on a closed channel".fmt(f),
213+
}
214+
}
215+
}
216+
217+
#[unstable(feature = "oneshot_channel", issue = "143674")]
218+
impl<T> error::Error for RecvTimeoutError<T> {}
219+
220+
#[unstable(feature = "oneshot_channel", issue = "143674")]
221+
impl<T> From<RecvError> for RecvTimeoutError<T> {
222+
/// Converts a `RecvError` into a `RecvTimeoutError`.
223+
///
224+
/// This conversion always returns `RecvTimeoutError::Disconnected`.
225+
///
226+
/// No data is allocated on the heap.
227+
fn from(err: RecvError) -> RecvTimeoutError<T> {
228+
match err {
229+
RecvError => RecvTimeoutError::Disconnected,
230+
}
231+
}
232+
}

0 commit comments

Comments
 (0)