Developing Full-Stack Rust Applications with WebSockets, Part 1
This post will be the first in a multi-part series covering the development of local web apps with a focus towards the type of application you might need for a connected home IoT product. In this series, I’ll be walking through standing up a Rust binary which runs on a Linux device and hosts a web app which can be used for configuration of the device or visualization of the data. We’ll be making use of major Rust libraries like yew
, tokio
, and axum
.
However in this post, the goal will just be to get an HTTP server up and running with an endpoint for issuing a health check and an endpoint for establishing a WebSocket. We’ll use a command line tool to verify we can send and receive packets on the WebSocket transport.
The Workspace
Here’s a little bit of the lay of the land. There’s a couple conventions I use for setting up a project in order to make writing all of the code in one VSCode project convenient and setting up CI in one repository easy.
The high level of the repository will look something like this
$ exa --tree -a iot-server/
iot-server
├── .git
│ └── ...
├── .vscode
│ └── settings.json
├── backend
│ ├── Cargo.toml
│ └── src
│ └── main.rs
└── frontend
├── Cargo.toml
└── src
└── main.rs
Don’t worry about the contents of the frontend
project right now, we’ll get into that in Part 2.
First, we’ll make some changes to .vscode/settings.json
. The Rust Analyzer plugin will automatically select a Cargo.toml
and build the project for the host architecture, but we can override it which will be convenient later while working on the frontend.
Set it up to look like this:
{
"rust-analyzer.check.allTargets": false,
//"rust-analyzer.cargo.target": "wasm32-unknown-unknown",
"rust-analyzer.linkedProjects": [
// Uncomment for the project you're actively working on
"backend/Cargo.toml",
//"frontend/Cargo.toml",
],
}
This will set Rust Analyzer to build the backend for the host architecture. Later, when we start working on the frontend web app, we’ll be compiling for WebAssembly as the target. You can uncomment the target setting and change the linked project, then reload the Rust Analyzer server to get code completion.
Goals for the Backend
Let’s talk about the goal for the backend by the end of this post. We’ll be using tokio
and axum
to standup an HTTP server with two endpoints: one for a health check which can be used remotely to check the server is up and one which upgrades to the WebSocket connection.
Dependencies
Rather than introduce dependencies piecemeal throughout the post, here’s all of the dependencies that you’ll need for this chapter. Add them to the Cargo.toml
for the backend project.
[dependencies]
anyhow = { version = "1" }
async-channel = { version = "2.1" }
axum = { version = "0.7.5", features = ["tokio", "ws"] }
clap = { version = "4.4", features = ["derive"] }
futures-util = { version = "0.3" }
tokio = { version = "1.0", features = ["full"] }
tower-http = { version = "0.5.0", features = ["fs", "trace"] }
tracing = { version = "0.1" }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
You can verify all of those pull and compile with the default generated main.rs
that cargo will generate and then compilations will be fast as you work.
Servicing Health Checks
We’ll start off as simple as possible, while keeping a mind for the future. We’ll be setting up the HTTP server with just a single endpoint: the health check. All this endpoint needs to do is return 200 OK
when queried. This serves as a handy way to check on the service remotely.
Starting off with our entry point in the main
function:
use axum::{extract::ConnectInfo, response::IntoResponse};
use std::net::SocketAddr;
use tower_http::trace::{DefaultMakeSpan, TraceLayer};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
let port = 8080;
serve_web_app(port).await?;
Ok(())
}
First we’ll set up tracing
to report info level logs, define a value for the port we’ll be listening on and immediately entering an async function for serving the web app, serve_web_app
.
The meat of the implementation is in this little serve_web_app
function:
async fn serve_web_app(port: u16) -> anyhow::Result<()> {
let addr = SocketAddr::from(([0, 0, 0, 0], port));
let listener = tokio::net::TcpListener::bind(addr).await?;
tracing::info!("Listening on {}", listener.local_addr()?);
let app = axum::Router::new()
.route("/health_check", axum::routing::get(handle_health_check))
.layer(
TraceLayer::new_for_http()
.make_span_with(DefaultMakeSpan::default().include_headers(true)),
);
axum::serve(
listener,
app.into_make_service_with_connect_info::<SocketAddr>(),
)
.await?;
Ok(())
}
We’ll setup the server to listen on TCP on the specified port, specifically using IP address 0.0.0.0
, as opposed to 127.0.0.1
(what’s the difference?). This allows the server to be accessible to other devices on the network, which you’ll want on the production deployment for an IoT device since you’re likely accessing it from a different device.
The axum::Router
uses the builder pattern to build up a router. We’ll just start with our health check endpoint which will be serviced with a handler function called handle_health_check
(shown later). Then we’ll add a layer with a trait from the tower_http
crate implemented on axum::Router
. This plugs it into the tracing
infrastructure we set up in main
. Additionally, any tracing
logs made in the handler functions will get integrated into the larger spans which can be handy for debugging.
Finally, we plug the router into our TCP listener to run the app. The into_make_service_with_connect_info
method gives access to peer address of the connection to the handler which I find is useful for debugging as well.
Finally, and perhaps rather undramatically, we get to the health check endpoint, which has a very simple scope after all the boilerplate is setup:
async fn handle_health_check(ConnectInfo(addr): ConnectInfo<SocketAddr>) -> impl IntoResponse {
tracing::info!("Received health check from address: {addr}");
()
}
And that’s all that is required to get started! Run it and access the endpoint either with curl
or your web browser and you should see some output like this:
$ RUST_LOG=debug cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.07s
Running `/barge/cargo-target/debug/iot-server`
2024-05-06T21:15:02.745430Z INFO iot_server: Listening on 0.0.0.0:8080
2024-05-06T21:15:06.308944Z DEBUG request{method=GET uri=/health_check version=HTTP/1.1 headers={"host": "127.0.0.1:8080", "user-agent": "curl/7.81.0", "accept": "*/*"}}: tower_http::trace::on_request: started processing request
2024-05-06T21:15:06.309026Z INFO request{method=GET uri=/health_check version=HTTP/1.1 headers={"host": "127.0.0.1:8080", "user-agent": "curl/7.81.0", "accept": "*/*"}}: iot_server: Received health check from address: 127.0.0.1:58252
2024-05-06T21:15:06.309068Z DEBUG request{method=GET uri=/health_check version=HTTP/1.1 headers={"host": "127.0.0.1:8080", "user-agent": "curl/7.81.0", "accept": "*/*"}}: tower_http::trace::on_response: finished processing request latency=0 ms status=200
You can see our message we planted in the handler: “Received health check from address”. You can also see that the logging coming from tower_http
can be quite verbose, I generally leave it turned off for that reason as it can be kind of noisy.
Serving WebSockets
With a lot of the boilerplate out of the way, we can get to the actual reason we’re here: setting up our connection between frontend and backend!
First we need to set up a little bit of application state which is accessible to endpoint handlers. In particular, it’s convenient to have another task which handles requests from the WebSocket connection and so whenever an HTTP client requests an upgrade to a WebSocket connection the spun-up task can access application information in an organized way that is not reliant on the transport layer.
We’ll add two channels for bidirectional communication with this hypothetical message handling task and pass half of each into the HTTP server:
In main
,
use async_channel::{Receiver, Sender};
async fn main() -> anyhow::Result<()> {
// --snip--
let port = 8080;
let (from_ws_tx, from_ws_rx) = async_channel::unbounded();
let (to_ws_tx, to_ws_rx) = async_channel::unbounded();
serve_web_app(port, from_ws_tx, to_ws_rx).await?;
// --snip--
}
In serve_web_app
,
#[derive(Clone)]
struct AppState {
to_ws_handler: Receiver<Vec<u8>>,
from_ws_handler: Sender<Vec<u8>>,
}
async fn serve_web_app(
port: u16,
from_ws_tx: Sender<Vec<u8>>,
to_ws_rx: Receiver<Vec<u8>>,
) -> anyhow::Result<()> {
// --snip--
let app_state = AppState {
to_ws_handler: to_ws_rx,
from_ws_handler: from_ws_tx,
};
tracing::info!("Listening on {}", listener.local_addr()?);
let app = axum::Router::new()
.route("/health_check", axum::routing::get(handle_health_check))
.with_state(app_state)
.layer(
TraceLayer::new_for_http()
.make_span_with(DefaultMakeSpan::default().include_headers(true)),
);
// --snip--
}
Adding this with_state
method to the axum::Router
allows passing a cloneable state object which can be passed to each handler in its function parameters, similar to the ConnectInfo<SocketAddr>
we used in the health check endpoint. (Axum refers to these as extractors, and it leverages destructuring like this)
With that available, we can now add the endpoint for upgrading HTTP requests to WebSocket connections. This takes the form of just adding another route on the endpoint /ws
(this can actually be whatever you’d like), which we service with the handle_ws_connection
.
use axum::extract::{ws::WebSocket, State, WebSocketUpgrade};
async fn serve_web_app(
port: u16,
from_ws_tx: Sender<Vec<u8>>,
to_ws_rx: Receiver<Vec<u8>>,
) -> anyhow::Result<()> {
// --snip--
let app = axum::Router::new()
.route("/health_check", axum::routing::get(handle_health_check))
.route("/ws", axum::routing::get(handle_ws_connection))
.with_state(app_state)
.layer(
TraceLayer::new_for_http()
.make_span_with(DefaultMakeSpan::default().include_headers(true)),
);
// --snip--
}
async fn handle_ws_connection(
ws: WebSocketUpgrade,
State(state): State<AppState>,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
) -> impl IntoResponse {
tracing::info!("Client at {addr} requested a WebSocket connection");
let AppState {
to_ws_handler: to_ws,
from_ws_handler: from_ws,
} = state;
ws.on_failed_upgrade(|err| tracing::error!("Failed to upgrade connection: {err}"))
.on_upgrade(move |socket| socket_task(socket, from_ws, to_ws))
}
Handling a request on the endpoint primarily consists in trying to upgrade from an HTTP request to a persistent WebSocket connection. axum
provides some convenient types for this where you just provide a callback for success and a callback for failure. There isn’t much to do if there’s a failure, just log and move on.
In the happy path, we set up the persistent async task responsible for shuffling messages between clients and the backend service which can process those messages (and generate its own as responses or unprompted reports). Since the communication is bidirectional, we can just split into two futures.
use axum::extract::ws::Message;
use futures_util::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
async fn socket_task(ws: WebSocket, from_ws_tx: Sender<Vec<u8>>, to_ws_rx: Receiver<Vec<u8>>) {
let (sender, receiver) = ws.split();
tokio::select! {
_ = handle_incoming_ws_message(receiver, from_ws_tx) => {},
_ = handle_outgoing_payloads(sender, to_ws_rx) => {},
};
}
async fn handle_outgoing_payloads(
mut sender: SplitSink<WebSocket, Message>,
to_ws_rx: Receiver<Vec<u8>>,
) {
while let Ok(msg) = to_ws_rx.recv().await {
if let Err(err) = sender.send(Message::Binary(msg)).await {
tracing::error!("Unable to send message to web client: {err}");
break;
}
}
}
If we make the content of our channel communication the same as a message coming over the WebSocket connection, we can just read messages straight out of the channel receiver and insert them into the Message::Binary
variant. There are a few different variants of Message
, but we’ll stick to Binary
for our application layer communication. The receiving side requires a little more handling due to the different types of messages we can receive.
async fn handle_incoming_ws_message(
mut receiver: SplitStream<WebSocket>,
from_ws_tx: Sender<Vec<u8>>,
) {
while let Some(msg) = receiver.next().await {
match msg {
Ok(Message::Binary(msg)) => {
tracing::debug!("Received {} bytes from client", msg.len());
if let Err(_) = from_ws_tx.send(msg).await {
tracing::warn!("Closing WebSocket connection from the server side");
break;
}
}
Ok(Message::Text(_)) => {
tracing::warn!("Received unexpected text from WebSocket client");
}
Ok(Message::Ping(_) | Message::Pong(_)) => {}
Ok(Message::Close(_)) => {
tracing::info!("WebSocket client closed the connection");
break;
}
Err(err) => {
tracing::error!("Communication with WebSocket client interrupted: {err}");
break;
}
}
}
}
We treat the receiver side of the WebSocket
handle just like a channel, but there’s many different variants of message. Message::Binary
is the type we’ll stick to for application messages, so those will be ferried along to the message handler over the channel. If we get an error, that indicates the channel is closed so we can exit the handler. Message::Text
is an alternate variant of message we could use, but we’ll ignore them with a warning since we’re not using them. Message::Ping
and Message::Pong
are handled by the server, so we can ignore these as well. Finally, we have Message::Close
which can be sent by the client before a proactive disconnect.
Quick Test
That was a lot of code in that last section. Before proceeding, it’s good to do a quick sanity check that nothing has gone horribly wrong. For that, we can use websocat
; a tool for sending and receiving data from WebSockets in the shell. You can install with
$ cargo install websocat
Run your server application in another shell and then let’s fire up websocat
to send a message.
You can try:
$ websocat ws://0.0.0.0:8080/ws
Then type a message followed by the Enter
key and you should see a warning in the application trace logs: “Received unexpected text from WebSocket client”. The WebSocket communication is working! You can send binary messages instead of text as well:
$ websocat --binary ws://0.0.0.0:8080/ws
hello world
And if you have debug logs enabled, you’ll see “Received 12 bytes from client”.
Handling Messages
As you can see above, debugging is currently a little awkward when all we have to go by are log messages. Messages that are sent ought to be visible and verifiable in some way. We’ll wrap up part 1 by setting up the task that’s actually handling the messages to do some very basic handling of messages.
Back in our main
function, there is a warning about two unused variables. The other side of our channels! We’ll launch our message handling task from main
as well.
async fn main() -> anyhow::Result<()> {
// --snip--
let port = 8080;
let (from_ws_tx, from_ws_rx) = async_channel::unbounded();
let (to_ws_tx, to_ws_rx) = async_channel::unbounded();
tokio::select! {
web_app_result = serve_web_app(port, from_ws_tx, to_ws_rx) => { web_app_result? },
handler_result = handle_messages(from_ws_rx, to_ws_tx) => { handler_result? },
}
}
async fn handle_messages(from_ws: Receiver<Vec<u8>>, to_ws: Sender<Vec<u8>>) -> anyhow::Result<()> {
while let Ok(msg) = from_ws.recv().await {
if let Ok(msg) = std::str::from_utf8(&msg[..]) {
tracing::debug!("Received message: {msg}");
let response = msg.to_uppercase();
if let Err(_) = to_ws.send(response.into_bytes()).await {
tracing::error!("Channel disconnected; exiting");
break;
}
}
}
Ok(())
}
For now, the message handler behaves very simply: it tries to parse a UTF-8 encoded string and if it can then it responds with the uppercase form of that string:
$ websocat --binary ws://0.0.0.0:8080/ws
hello world
HELLO WORLD
We’ll turn this into something more useful in the next chapter, but that wraps up this post!
Summary
In this post, I covered how to set up an async Rust program which serves an HTTP server with an endpoint to check the liveness of the server and an endpoint for creating a WebSocket connection to the server. We piped messages from that connection to a message handler task which processed the message and responded.
In the next post, we’ll switch gears and use Yew to build a frontend application in Rust which can utilize this WebSocket communication to communicate with the backend.
If you’re following along and want to compare what code you’ve written with where I’m at so far, feel free to check out this branch of the repository on GitHub!
If you notice a dead link or have some other correction to make, feel free to make an issue on GitHub!