When you’re writing network services, one decision you need to make is how to handle restarting or updating your service.
A naïve (but sometimes acceptable) approach is to simply shut down the service and start a new copy. Any clients that are currently connected will have their connections interrupted, and any clients that attempt to connect between the previous copy shutting down and the new one starting up will hit some kind of ‘connection refused’ error.
Depending on the clients, this might be perfectly acceptable - if you know the client will handle interrupted connections by re-connecting, and will retry in the face of connection errors, then this might not even be visible to the end-user.
Ideally though, you’d like to shut down the service ‘gracefully’ - that means:
- Stop accepting new connections
- Complete all in-progress connections
- Exiting when in-progress connections are complete Where ideally these all happen as soon as it’s feasible, without unnecessary pauses.
This doesn’t solve the whole problem, since you’ve still got downtime between one copy stopping and the next starting up, however once you’ve got this in place you’re then in a position to implement more interesting restart strategies. For example, you could:
- Use a supervisor service to create the socket listener, and hand it over between copies of the program so there’s only a slight delay and no failed connections
- or run multiple instances of the program simultaneously, with a load balancer that detects when one instance stops accepting connections and seamlessly falls back on the other other
- Use DNS to provide multiple IPs to the client, and let the client handle re-trying with different instances until one connects
We’ll look at these other strategies in a further post; for now let’s focus on the graceful shutdown.
Code sample
Let’s take a look at the sort of code we’ll be enhancing. A typical web service has the following phases:
- Initialise resources, such as
- Environment variables
- Socket listener
- DB connections
- etc.
- Start background tasks
- Worker threads (if needed)
- Any periodic management or cleanup
- Run main worker loop
The main worker loop handles waiting for new incoming connections, accepting them, and deciding what to do with them. Normally you don’t want the main worker actually handling connections itself - if it’s working on a connection, it can’t be accepting new connections, and we won’t gain any benefit from concurrency. Instead, we’ll spawn tasks that can progress independently of the main loop.
We’ll use a couple of helper crates, although these aren’t required - anyhow
for error handling and clap
for argument parsing.
use clap::Parser;
#[derive(Parser)]
struct Args {
...
}
#[tokio::main]
fn main() -> anyhow::Result<()> {
// Parse arguments
let args = Args::parse()?;
let db_conn = Db::new(&args.db_string).await?;
let listener = tokio::net::TcpListener::bind("127.0.0.1:8000").await?;
tokio::spawn(background_tasks());
loop {
let (socket, _addr) = listener.accept().await?;
tokio::spawn(handle_connection(socket))
}
}
async fn background_tasks() { ... }
async fn handle_connection(socket: tokio::net::TcpStream) { ... }
A couple of things to note here:
- We’re using tokio as our async runtime. If you’re not already familiar with tokio, it’s worth working through their tutorial and getting familiar with async first.
- Calling
background_tasks
andhandle_connection
doesn’t immediately run their code - each returns a suspended future which we can pass around. In this case we pass it totokio::spawn
to be managed by its worker queue.
At the moment, any termination signal will immediately kill the process. Over the next couple of sections we’ll refine this until it handles shutdown gracefully.
Step 1: knowing when to quit
For the purpose of this article we’ll use SIGINT
as our termination signal. SIGINT
is what gets triggered when you stop a terminal program by pressing ctrl-c, and it gives programs a chance to detect it and choose their own exit mechanism.
tokio
helpfully provides a SIGINT handler, which we can use to detect when SIGINT has been called. It comes in the form of an async function that yields only when SIGINT has been triggered:
async fn handle_sigint() {
tokio::signal::ctrl_c().await.expect("ctrl-c handler failed");
println!("Got SIGINT - exiting!");
// Terminate the program with exit code 0
std::process::exit(0);
}
We can spawn this as a background task:
// in main
tokio::spawn(handle_sigint());
loop {
let (socket, _addr) = listener.accept().await?;
tokio::spawn(handle_connection(socket))
}
Great, we’re handling SIGINT! But it’s not much better - we’re still abruptly exiting the program.
Step 2: propagating the shutdown request
Rather than exiting abruptly, let’s pass on the shutdown request to the other tasks.
For this we can use CancellationToken
from the tokio_util
crate. This has a simple API, with a couple of helpful methods and an extension trait for wrapping other futures (API adjusted slightly for readability - see the original for precise details):
impl CancellationToken {
// Create a new token
fn new() -> CancellationToken { ... }
// Create a duplicate token that cancels at the same time
fn clone(&self) -> CancellationToken { ... }
// Trigger cancellation
fn cancel(&self) { ... }
// Create a future that yields only when this token is cancelled
fn cancelled(&self) -> impl Future { ... }
}
trait FutureExt {
// Wrap a future in a cancellation token. This will either yield:
// Some(x) if the inner future yielded a result
// None if the token was triggered
fn with_cancellation_token(self, token: &CancellationToken) -> impl Future {
// pseudocode
return async move {
race(
self.map(Some),
token.cancelled().map(|()| None)
)
}
...
}
}
This gives everything we need to stop all the futures as soon as we get the SIGINT:
#[tokio::main]
fn main() -> anyhow::Result<()> {
let token = CancellationToken::new();
// Wrap background tasks in cancellation token, so they stop
// as soon as it's triggered
tokio::spawn(
background_tasks().with_cancellation_token(token.clone())
);
// Register SIGINT handler, trigger token when it's
tokio::spawn(handle_sigint(token.clone()));
// Run until with_cancellation_token returns None, at which
// point we exit the `while` loop
while let Some((socket, _addr)) = listener
.accept()
.with_cancellation_token(token)
.await?
{
tokio::spawn(handle_connection(socket));
}
}
async fn handle_sigint(token: CancellationToken) {
// yield only when SIGINT is triggered
tokio::signal::ctrl_c().await.expect("ctrl-c handler failed");
token.cancel();
}
async fn handle_connection(socket: tokio::net::TcpStream) {
...
}
So what’s happening here?
- Every future passed to
tokio::spawn
is first wrapped in aCancellationToken
- When SIGINT is triggered, the
CancellationToken
is cancelled - Every future immediately stops, yielding
None
- The
listener
loop exits, and therefore exitsmain()
This is slightly better than the previous form, and means main
tidies up and exits cleanly. This still has a problem though - we’re not waiting for connections to complete before exiting.
Let’s resolve this in the next section.
Tracking futures
When we tokio::spawn
a future, that future runs in some background task until it has completed. The spawn
call returns a JoinHandle
object, but the future will run regardless of what we do with the JoinHandl
.
If we want to keep track of it, we need to keep hold that JoinHandle
, since gives us a proxy to find out when the task is complete.
A simple option is to store the JoinHandle
s, and periodically discarding any that complete. When we exit, we can finish off by waiting for any handles that still haven’t terminated:
fn main() {
...
let handles = vec![];
while let Some((socket, _addr)) = listener
.accept()
.with_cancellation_token(token)
.await?
{
let handle = tokio::spawn(handle_connection(socket));
handles.push(handle);
handles.retain(|h| !h.is_finished());
}
// Cancellation token returned `None`
println!("Waiting for connections to terminate");
// Wait for remaining handles and exit.
// Uses the `futures` crate: https://docs.rs/futures
futures::future::join_all(handles).await;
println!("Exiting");
// end of main()
}
This filtering and waiting approach can be a bit inefficient - we’re constantly scanning the handles
list to remove any finished entries. Instead, we can use something like tokio_util
’s TaskTracer
to do this tracking for us. See the examples in that link for more details.
An alternative option is to use a distinct executor for these tasks, one that can drive these futures specifically. One example is to use tokio
’s LocalSet
. The futures won’t be run in the background, instead relying on the LocalSet
to be driven forward.
The difference here is that all the futures will be run on a single task, so they’ll run concurrently but not in parallel (i.e. not on multiple threads). Your choice here will depend on your service and its particular needs:
fn main() {
...
let local = task::LocalSet::new();
local.run_until(async move {
while let Some((socket, _addr)) = listener
.accept()
.with_cancellation_token(token)
.await?
{
// Spawn onto LocalSet
tokio::task::spawn_local(handle_connection(socket));
}
}).await?;
// Wait for remaining tasks to complete
println!("Waiting for connections to terminate");
local.await;
println!("Exiting");
}
Revisiting connection shutdown
We’ve now resolved our race condition, at a cost: if any connections are taking too long to process, we’ll wait an unbounded amount of time for them to complete.
This means a rogue client could cause trouble - for example in the case of a slowloris attack, where a malicious client can feed bytes through very slowly in a way that wastes resource.
Now is a good time to think in a bit more detail about what timout strategy connections should have. Do we allow a per-connection timeout? A whole-program timeout? Or do we accept that we might wait indefinitely?
A reasonable starting point is a staggered approach for different levels, but this will depend on your particular service. For example:
- 10 seconds per connection (globally - regardless of shutdown)
- 20 seconds for all the connections during shutdown
- 30 seconds for the whole program to terminate
For the 30-second timeout, we’ll normally configure whatever scheduler we use (external to the program) to send a SIGTERM or SIGKILL signal if it’s not exited 30 seconds after a SIGINT. For the rest, we can handle it internally using the timeout
method in tokio
:
use std::time::Duration;
use tokio_util::future::FutureExt;
fn main() {
...
let local = tokio::task::LocalSet::new();
local.run_until(async move {
while let Some((socket, addr)) = listener
.accept()
.with_cancellation_token(token)
.await?
{
// Spawn onto LocalSet
tokio::task::spawn_local(async move {
if let Err(_) =
handle_connection(socket)
.timeout(Duration::from_secs(10))
.await
{
println!("Connection timeout: {addr}");
}
});
}
}).await?;
// Wait for remaining tasks to complete
println!("Waiting for connections to terminate");
if let Err(_) = local.await.timeout(Duration::from_secs(20)) {
println!("Timed out waiting for connections")
}
println!("Exiting");
}
A good rule of thumb is to periodically audit your spawn
and await
calls, and ensure every spawned future has either:
- Confidence that it will complete in a finite, bounded time
- Or, for long-running tasks, take a cancellation token passed in, so it can detect cancellations and exit cleanly
- Or be wrapped in a
with_cancellation_token
call, so it gets dropped immediately on cancellation - Or be wrapped in some kind of timeout, to enforce it can’t run unboundedly
Can we do this without Tokio?
We’ve used a few different tokio
features in these examples, but nothing here is specific to tokio
. For each of these features, there’s different crates available with very similar API surfaces, which means you’re not tied to a single runtime implementation.
- https://docs.rs/async-ctrlc/latest/async_ctrlc/ for SIGINT handling
- https://docs.rs/async-executor/latest/async_executor/ for a standalone executor
- https://docs.rs/smol-cancellation-token/0.1.0/smol_cancellation_token/ for propagating cancellations
What about with HTTP frameworks?
Most HTTP frameworks don’t have you manage connections yourself - instead, you pass in a listener, and they handle reading off the socket and processing requests, of the following form:
async fn main() {
let listener = TcpListener::bind("127.0.0.1:8080");
Server::new().endpoint("/foo", bar).run(listener).await;
}
In this case, most frameworks provide some kind of graceful shutdown option you can pass on startup, which indicates to the framework that it should shut down. Then it’s easy enough to pass in a CancellationToken
, or an async {}
block that wraps the CancellationToken
and converts the result to the right type.
For example:
With these, you can hook in a similar structure to what we had above, and without having to track handles ourselves:
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let token = CancellationToken::new();
// Wrap background tasks in cancellation token, so they stop
// as soon as it's triggered
tokio::spawn(
background_tasks().with_cancellation_token(token.clone())
);
// Register SIGINT handler which triggers token
tokio::spawn(handle_sigint(token.clone()));
let app = ...;
poem::Server::new(TcpListener::bind("127.0.0.1:8000"))
.run_with_graceful_shutdown(app, token, None)
.await?;
}
Recap
We’ve looked at techniques for gracefully handling shutdown in an async Rust program. As a general structure, this involves:
- Making sure connections have suitable timeouts
- Spotting termination requests - e.g. by detecting SIGINT signals
- Propagating the termination request, using a Cancellation Token, to stop picking up more connections
- Waiting for connections to terminate, using an executor or task tracker
As a result, we now have a program that gracefully shuts down on a SIGINT signal - taking no longer than it needs to, but still waiting for outstanding connections complete.
In the next article, we’ll combine this with a scheduler to implement Zero Downtime Deployments - see you then!