Understanding Concurrency in Rust

October 16, 2023
Written by
Joseph Udonsak
Contributor
Opinions expressed by Twilio contributors are their own
Reviewed by

Because Rust is statically typed and compiled, does not use garbage collection, and has zero-cost abstractions; application speed is a given. However, you can do even more by integrating concurrency into your application designs.

Concurrency is the ability to execute multiple tasks simultaneously, and this lies at the heart of high-performance applications, enabling them to handle complex operations efficiently. Rust makes working with concurrency a more pleasant experience.

By nature, concurrent operations are non-deterministic. As a result, debugging errors can be a nightmare. Rust resolves this by applying ownership and type checking rules at compile-time, instead of runtime. This makes it easier to detect and resolve errors.

In this article, I will show you how to take advantage of concurrency to reduce the execution time of your applications. To do this, I will show you how to build a Rust application which generates a usage report for your Twilio account.

Prerequisites

To follow this tutorial, you will need the following:

What you will build

You will be building an application that generates a usage report for your Twilio account. This report will be a spreadsheet with four sheets containing the following information:

  1. Account details
  2. All-time usage records
  3. Message records
  4. Call records

Let's get started

Where you create your Rust projects, create a new Rust project and change into it using the following commands.

cargo new concurrency_demo
cd concurrency_demo

Add the project dependencies

Update the dependencies section of the Cargo.toml file, in the project's top-level directory, to match the following.

dependencies]
dotenvy = "0.15"
reqwest = { version = "0.11.18"

Here’s what each added crate does:

  • Dotenvy: Dotenvy helps with loading environment variables. It is a well-maintained version of the dotenv crate.
  • Reqwest: This package will simplify sending API requests to Twilio.
  • rust_xlsxwriter: This package helps with writing Excel files in the xlsx format.
  • serde, serde_derive, and serde_json: These packages reduce the complexity of deserialising JSON responses from Twilio, so that they can be more easily used.

Set the required environment variables

Now, create a new file called .env in the project's top-level directory, and paste the following code into it.

TWILIO_ACCOUNT_SID="<<TWILIO_ACCOUNT_SID>>"
TWILIO_AUTH_TOKEN="<<TWILIO_AUTH_TOKEN>>"

After that, retrieve your Twilio Auth Token and Account SID from the Twilio Console Dashboard, and insert them in place of the respective placeholders.

Build the application

Now that you have your Twilio credentials, you’re set to start building the application. To make your code easy to follow, you’ll build separate modules to handle separate concerns.

Handle errors

This application will use the Result type to manage errors. There are three possible errors that could occur in the application, in the following scenarios:

  1. Processing an API request
  2. Deserializing the API response
  3. Writing the results to a spreadsheet

To handle these errors, you will create an enum with variants for each error. Your enum will also implement the Error and Display traits.

In the src folder at the root of your project, create a new file named error.rs and add the following code to it.

use std::error::Error;
use std::fmt::{Display, Formatter};
use rust_xlsxwriter::XlsxError;

#[derive(Debug)]
pub enum ReportGenerationError {
    RetrieveRecordError(reqwest::Error),
    ParseRecordError(serde_json::Error),
    WriteReportError(XlsxError),
}

impl From<XlsxError> for ReportGenerationError {
    fn from(e: XlsxError) -> Self {
        Self::WriteReportError(e)
    }
}

impl From<reqwest::Error> for ReportGenerationError {
    fn from(e: reqwest::Error) -> Self {
        Self::RetrieveRecordError(e)
    }
}

impl From<serde_json::Error> for ReportGenerationError {
    fn from(e: serde_json::Error) -> Self {
        Self::ParseRecordError(e)
    }
}

impl Error for ReportGenerationError {}

impl Display for ReportGenerationError {
    fn fmt(&self, f: &mut Formatter) -> Result<(), std::fmt::Error> {
        match self {
            ReportGenerationError::RetrieveRecordError(e) => { write!(f, "error retrieving results {}", e) }
            ReportGenerationError::ParseRecordError(e) => { write!(f, "error parsing results {}", e) }
            ReportGenerationError::WriteReportError(e) => { write!(f, "error generating report {}", e) }
        }
    }
}

In this code, you started by creating an enum named ReportGenerationError which encapsulates the possible errors that can occur. Next, you added implementations for the From trait, which allows you convert the library specific errors to your custom error type. Finally, you implemented the Error and Display traits.

With this, you can declare the return type of your function signatures as Result<T, ReportGenerationError> where T is a generic type. This will simplify your error handling as you can use the ? keyword after your function calls.

Declare the models

In the src folder, create a new file named model.rs and add the following code to it.

use serde_derive::{Deserialize, Serialize};

#[derive(Deserialize, Serialize, Clone)]
pub struct AccountDetails {
    pub friendly_name: String,
    pub sid: String,
    pub date_created: String,
    pub status: String,
    #[serde(rename = "type")]
    pub account_type: String,
}

impl AccountDetails {
    pub fn empty() -> Self {
        AccountDetails {
            friendly_name: String::from(""),
            sid: String::from(""),
            date_created: String::from(""),
            status: String::from(""),
            account_type: String::from(""),
        }
    }
}

#[derive(Deserialize, Serialize, Clone)]
pub struct UsageRecord {
    pub account_sid: String,
    pub category: String,
    pub description: String,
    pub usage_unit: Option<String>,
    pub usage: String,
    pub price: String,
}

#[derive(Deserialize, Serialize, Clone)]
pub struct MessageRecord {
    pub sid: String,
    pub account_sid: String,
    pub date_created: String,
    pub date_sent: String,
    pub from: String,
    pub to: String,
    pub status: String,
    pub num_segments: String,
    pub num_media: String,
    pub price: Option<String>,
}

#[derive(Deserialize, Serialize, Clone)]
pub struct CallRecord {
    pub sid: String,
    pub account_sid: String,
    pub date_created: String,
    pub from: String,
    pub to: String,
    pub status: String,
    pub start_time: String,
    pub end_time: String,
    pub price: Option<String>,
}

These structs are modelled after the structure of responses from the Twilio API, which will be written to the spreadsheet.

Retrieve your usage details from Twilio

At the moment, there’s no Twilio SDK available for Rust. However, using Twilio’s Programmable Messaging API, it is possible to send JSON requests for the purpose of retrieving account information. The next module you will add will contain the relevant functions to simplify the process of retrieving your account information from the Twilio API.

In the src folder, create a new file named twilio.rs and add the following code to it.

use std::env;

use serde_json::Value;

use crate::error::ReportGenerationError;
use crate::model::{AccountDetails, CallRecord, MessageRecord, UsageRecord};

fn make_request(endpoint: &str) -> Result<String, ReportGenerationError> {
    let twilio_account_sid =
        env::var("TWILIO_ACCOUNT_SID").expect("Twilio Account SID could not be retrieved.");
    let twilio_auth_token =
        env::var("TWILIO_AUTH_TOKEN").expect("Twilio Auth Token could not be retrieved.");

    let client = reqwest::blocking::Client::new();

    let request_url = format!("https://api.twilio.com/2010-04-01/Accounts/{twilio_account_sid}{endpoint}.json?PageSize=1000");
    let response = client
        .get(request_url)
        .basic_auth(twilio_account_sid, Some(twilio_auth_token))
        .send()?;
    let response_body = response.text()?;
    Ok(response_body)
}

pub fn get_account_details() -> Result<AccountDetails, ReportGenerationError> {
    let response = make_request("")?;
    let account_record: AccountDetails = serde_json::from_str(&response)?;
    Ok(account_record)
}

pub fn get_call_records() -> Result<Vec<CallRecord>, ReportGenerationError> {
    let response = make_request("/Calls")?;
    let mut response: Value = serde_json::from_str(&response)?;
    let call_records: Vec<CallRecord> = serde_json::from_value(response["calls"].take())?;
    Ok(call_records)
}

pub fn get_message_records() -> Result<Vec<MessageRecord>, ReportGenerationError> {
    let response = make_request("/Messages")?;
    let mut response: Value = serde_json::from_str(&response)?;
    let message_records: Vec<MessageRecord> = serde_json::from_value(response["messages"].take())?;
    Ok(message_records)
}

pub fn get_usage_records() -> Result<Vec<UsageRecord>, ReportGenerationError> {
    let response = make_request("/Usage/Records")?;
    let mut response: Value = serde_json::from_str(&response)?;
    let usage_records: Vec<UsageRecord> = serde_json::from_value(response["usage_records"].take())?;
    Ok(usage_records)
}

The make_request function is used to send requests to the Twilio API. The API response will be returned as a string (in the absence of any error). This function takes one argument which is a string slice named endpoint. This string is appended to the base URL for the Twilio API in order to get the specified resource.

Next, the get_account_details() function makes a request to the base endpoint which returns the account details for the provided Twilio Auth Token and Account SID. The returned string is parsed into an AccountDetails struct and returned accordingly.

The get_call_records(), get_message_records(), and  get_usage_records() functions work in a similar manner. First, a request is made using the make_request() function (providing the appropriate endpoint). Next, instead of parsing the returned string directly, the returned string is converted to a Value object. Then the appropriate key in the value is retrieved and parsed to get the required vector of objects.

Write the results to a spreadsheet

The next thing you’ll implement is the functionality to write the retrieved records to a spreadsheet. In the src folder, create a new file named writer.rs and add the following code to it.

use rust_xlsxwriter::{Format, FormatAlign, FormatBorder, RowNum, Workbook};

use crate::error::ReportGenerationError;
use crate::model::{AccountDetails, CallRecord, MessageRecord, UsageRecord};

pub fn write_results(account_details: AccountDetails, usage_records: Vec<UsageRecord>, message_records: Vec<MessageRecord>, call_records: Vec<CallRecord>) -> Result<(), ReportGenerationError> {
    let mut workbook = Workbook::new();

    write_account_details(&mut workbook, account_details)?;
    write_usage_records(&mut workbook, usage_records)?;
    write_message_records(&mut workbook, message_records)?;
    write_call_records(&mut workbook, call_records)?;

    workbook.save("Usage report.xlsx")?;

    Ok(())
}

fn write_account_details(workbook: &mut Workbook, account_details: AccountDetails) -> Result<(), ReportGenerationError> {
    let worksheet = workbook.add_worksheet();
    worksheet.set_name("Account")?;

    let header_format = get_header_format();

    worksheet.write_with_format(0, 0, "Friendly name", &header_format)?;
    worksheet.write_with_format(0, 1, "SID", &header_format)?;
    worksheet.write_with_format(0, 2, "Date created", &header_format)?;
    worksheet.write_with_format(0, 3, "Status", &header_format)?;
    worksheet.write_with_format(0, 4, "Type", &header_format)?;

    let row_format = get_row_format();

    worksheet.write_with_format(1, 0, account_details.friendly_name, &row_format)?;
    worksheet.write_with_format(1, 1, account_details.sid, &row_format)?;
    worksheet.write_with_format(1, 2, account_details.date_created, &row_format)?;
    worksheet.write_with_format(1, 3, account_details.status, &row_format)?;
    worksheet.write_with_format(1, 4, account_details.account_type, &row_format)?;

    worksheet.autofit();

    Ok(())
}

fn write_usage_records(workbook: &mut Workbook, usage_records: Vec<UsageRecord>) -> Result<(), ReportGenerationError> {
    let worksheet = workbook.add_worksheet();
    worksheet.set_name("All time usage")?;

    let header_format = get_header_format();

    worksheet.write_with_format(0, 0, "S/N", &header_format)?;
    worksheet.write_with_format(0, 1, "Account SID", &header_format)?;
    worksheet.write_with_format(0, 2, "Category", &header_format)?;
    worksheet.write_with_format(0, 3, "Description", &header_format)?;
    worksheet.write_with_format(0, 4, "Usage unit", &header_format)?;
    worksheet.write_with_format(0, 5, "Usage", &header_format)?;
    worksheet.write_with_format(0, 6, "Price", &header_format)?;

    let row_format = get_row_format();

    for (i, record) in usage_records.iter().enumerate() {
        let row_index: RowNum = (i + 1) as RowNum;
        worksheet.write_with_format(row_index, 0, row_index, &row_format)?;
        worksheet.write_with_format(row_index, 1, &record.account_sid, &row_format)?;
        worksheet.write_with_format(row_index, 2, &record.category, &row_format)?;
        worksheet.write_with_format(row_index, 3, &record.description, &row_format)?;
        if let Some(usage_unit) = &record.usage_unit {
            worksheet.write_with_format(row_index, 4, usage_unit, &row_format)?;
        } else {
            worksheet.write_with_format(row_index, 4, "", &row_format)?;
        }
        worksheet.write_with_format(row_index, 5, &record.usage, &row_format)?;
        worksheet.write_with_format(row_index, 6, &record.price, &row_format)?;
    }
    worksheet.autofit();
    Ok(())
}

fn write_message_records(workbook: &mut Workbook, message_records: Vec<MessageRecord>) -> Result<(), ReportGenerationError> {
    let worksheet = workbook.add_worksheet();
    worksheet.set_name("Messages")?;

    let header_format = get_header_format();

    worksheet.write_with_format(0, 0, "S/N", &header_format)?;
    worksheet.write_with_format(0, 1, "SID", &header_format)?;
    worksheet.write_with_format(0, 2, "Account SID", &header_format)?;
    worksheet.write_with_format(0, 3, "Date created", &header_format)?;
    worksheet.write_with_format(0, 4, "Date sent", &header_format)?;
    worksheet.write_with_format(0, 5, "From", &header_format)?;
    worksheet.write_with_format(0, 6, "To", &header_format)?;
    worksheet.write_with_format(0, 7, "Status", &header_format)?;
    worksheet.write_with_format(0, 8, "Segments", &header_format)?;
    worksheet.write_with_format(0, 9, "Media", &header_format)?;
    worksheet.write_with_format(0, 10, "Price", &header_format)?;

    let row_format = get_row_format();
    for (i, record) in message_records.iter().enumerate() {
        let row_index: RowNum = (i + 1) as RowNum;
        worksheet.write_with_format(row_index, 0, row_index, &row_format)?;
        worksheet.write_with_format(row_index, 1, &record.sid, &row_format)?;
        worksheet.write_with_format(row_index, 2, &record.account_sid, &row_format)?;
        worksheet.write_with_format(row_index, 3, &record.date_created, &row_format)?;
        worksheet.write_with_format(row_index, 4, &record.date_sent, &row_format)?;
        worksheet.write_with_format(row_index, 5, &record.from, &row_format)?;
        worksheet.write_with_format(row_index, 6, &record.to, &row_format)?;
        worksheet.write_with_format(row_index, 7, &record.status, &row_format)?;
        worksheet.write_with_format(row_index, 8, &record.num_segments, &row_format)?;
        worksheet.write_with_format(row_index, 9, &record.num_media, &row_format)?;
        if let Some(price) = &record.price {
            worksheet.write_with_format(row_index, 10, price, &row_format)?;
        } else {
            worksheet.write_with_format(row_index, 10, "", &row_format)?;
        }
    }

    worksheet.autofit();
    Ok(())
}

fn write_call_records(workbook: &mut Workbook, call_records: Vec<CallRecord>) -> Result<(), ReportGenerationError> {
    let worksheet = workbook.add_worksheet();
    worksheet.set_name("Calls")?;

    let header_format = get_header_format();

    worksheet.write_with_format(0, 0, "S/N", &header_format)?;
    worksheet.write_with_format(0, 1, "SID", &header_format)?;
    worksheet.write_with_format(0, 2, "Account SID", &header_format)?;
    worksheet.write_with_format(0, 3, "Date created", &header_format)?;
    worksheet.write_with_format(0, 4, "From", &header_format)?;
    worksheet.write_with_format(0, 5, "To", &header_format)?;
    worksheet.write_with_format(0, 6, "Status", &header_format)?;
    worksheet.write_with_format(0, 7, "Start time", &header_format)?;
    worksheet.write_with_format(0, 8, "End time", &header_format)?;
    worksheet.write_with_format(0, 9, "Price", &header_format)?;

    let row_format = get_row_format();

    for (i, record) in call_records.iter().enumerate() {
        let row_index: RowNum = (i + 1) as RowNum;
        worksheet.write_with_format(row_index, 0, row_index, &row_format)?;
        worksheet.write_with_format(row_index, 1, &record.sid, &row_format)?;
        worksheet.write_with_format(row_index, 2, &record.account_sid, &row_format)?;
        worksheet.write_with_format(row_index, 3, &record.date_created, &row_format)?;
        worksheet.write_with_format(row_index, 4, &record.from, &row_format)?;
        worksheet.write_with_format(row_index, 5, &record.to, &row_format)?;
        worksheet.write_with_format(row_index, 6, &record.status, &row_format)?;
        worksheet.write_with_format(row_index, 7, &record.start_time, &row_format)?;
        worksheet.write_with_format(row_index, 8, &record.end_time, &row_format)?;
        if let Some(price) = &record.price {
            worksheet.write_with_format(row_index, 9, price, &row_format)?;
        } else {
            worksheet.write_with_format(row_index, 9, "", &row_format)?;
        }
    }

    worksheet.autofit();
    Ok(())
}

fn get_header_format() -> Format {
    Format::new()
        .set_align(FormatAlign::Center)
        .set_border(FormatBorder::Thick)
        .set_bold()
}

fn get_row_format() -> Format {
    Format::new()
        .set_align(FormatAlign::Justify)
        .set_border(FormatBorder::Thin)
}

The write_account_details(),write_usage_records(), write_message_records(), and write_call_records() functions are used to write the corresponding records to a separate sheet in the Excel file. Each function follows the following pattern:

  1. Create a new sheet and give it a name.
  2. Write the headers for the sheet and format them using the return value of the get_header_format() function.
  3. Iterate through the records, write the appropriate cell value for each record, and format using the return value of the get_row_format() function.
  4. Adjust the cell sizes to make sure that the content is displayed properly. This is done using the autofit() function.

The only public function in this module is the write_results(), which takes all the records (account details, usage records, message records, and call records), creates a new file, and passes each record to the appropriate function to be written accordingly. Once this is done, the file is saved and closed.

Generate a report synchronously

To generate the report synchronously, the records are retrieved one after the other via the previously declared helper functions. Once the records are available, pass them to the write_results() function you declared earlier.

To do this, add the following function to the main.rs file in the src folder.

pub fn generate_report_synchronously() -> Result<(), ReportGenerationError> {
    let account_details = twilio::get_account_details()?;
    let usage_records = twilio::get_usage_records()?;
    let message_records = twilio::get_message_records()?;
    let call_records = twilio::get_call_records()?;

    write_results(account_details, usage_records, message_records, call_records)?;

    Ok(())
}

Remember to update your imports and module declarations, as shown below.

use crate::error::ReportGenerationError;
use crate::writer::write_results;

mod model;
mod twilio;
mod writer;
mod error;

Generate a report concurrently

The concurrent version will use threads to make the relevant API calls. These threads will update placeholders for the respective values. Once all threads have run successfully, the records are written to the spreadsheet. To do this, add the following function to the main.rs file in the src folder.

pub fn generate_report_concurrently() -> Result<(), ReportGenerationError> {
    let account_record = Arc::new(Mutex::new(AccountDetails::empty()));
    let usage_records = Arc::new(Mutex::new(vec![]));
    let message_records = Arc::new(Mutex::new(vec![]));
    let call_records = Arc::new(Mutex::new(vec![]));

    let mut handles = vec![];

    let cloned_record = account_record.clone();
    handles.push(thread::spawn(move || {
        let mut record = cloned_record.lock().unwrap();
        *record = twilio::get_account_details().unwrap();
    }));

    let cloned_usage = usage_records.clone();
    handles.push(thread::spawn(move || {
        let mut result = cloned_usage.lock().unwrap();
        *result = twilio::get_usage_records().unwrap();
    }));

    let cloned_messages = message_records.clone();
    handles.push(thread::spawn(move || {
        let mut result = cloned_messages.lock().unwrap();
        *result = twilio::get_message_records().unwrap();
    }));

    let cloned_calls = call_records.clone();
    handles.push(thread::spawn(move || {
        let mut result = cloned_calls.lock().unwrap();
        *result = twilio::get_call_records().unwrap();
    }));

    for handle in handles {
        handle.join().unwrap();
    }

    let account_details = account_record.lock().unwrap().to_owned();
    let usage_records = usage_records.lock().unwrap().to_vec();
    let message_records = message_records.lock().unwrap().to_vec();
    let call_records = call_records.lock().unwrap().to_vec();

   write_results(
        account_details, usage_records.to_vec(), message_records, call_records,
    )?;

    Ok(())
}

Update the imports to match the following.

use std::sync::{Arc, Mutex};
use std::thread;

use crate::error::ReportGenerationError;
use crate::model::AccountDetails;
use crate::writer::write_results;

This function starts off by declaring empty state values for the records you want to get. However, these values are wrapped with some structs. The Arc and Mutex structs are provided by Rust to guard against two key pitfalls of concurrency: race conditions and deadlocks. Arc is an atomic reference counter, which allows multiple threads to update a shared value while Mutex guards the wrapped resource and ensures that only one thread can access the resource at a time.

Because of  the handle.join().unwrap(); call, the main thread will wait for the secondary threads to run before proceeding.

Finally, the updated values are retrieved and passed to the write_results() function, as usual.

Compare the results

Rust’s built-in benchmark tests are only available on Nightly Rust because they use unstable features, so you will compare the results of both functions by measuring how long they take to execute.

To do this, update the main() function in src/main.rs to match the following.

fn main() -> Result<(), ReportGenerationError> {
    dotenv().ok();
    println!("Starting synchronous execution");
    let start = Instant::now();
    generate_report_synchronously()?;
    println!("Synchronous Execution Time: {:?}", start.elapsed());

    println!("Starting concurrent execution");
    let start = Instant::now();
    generate_report_concurrently()?;
    println!("Concurrent Execution Time: {:?}", start.elapsed());

    Ok(())
}

Remember to add the following to the imports list at the top of main.rs.

use std::time::Instant;
use dotenvy::dotenv;

Before calling either function, the application environment variables are loaded. Next, before each function call, the current time is captured using an Instant. After the function runs, the execution time is printed by calling the elapsed() function.

Run your application using the following command.

cargo run main.go

You should see something similar to the following as a response

Starting synchronous execution
Synchronous Execution Time: 8.687679908s
Starting concurrent execution
Concurrent Execution Time: 5.271609783s

While the results will vary from yours (depending on internet speed and the processing power of your workstation), you will see that the concurrent execution always performs considerably better.

You will also see a new Excel spreadsheet in the project's top-level directory, named Usage report.xlsx, containing the results returned from your Twilio account.

Now you understand the essentials of concurrency in Rust.

Well done for coming this far!! Rust has a bad rep for being verbose and difficult to learn, but once you understand what is going on, you can see that it’s not that difficult.

This is just a tip of the iceberg in terms of what Rust has to offer for concurrent applications, you can go even further by taking advantage of channels.

The entire codebase is available on GitHub, should you get stuck at any point. I’m excited to see what else you come up with. Until next time, make peace not war ✌🏾

Joseph Udonsak is a software engineer with a passion for solving challenges – be it building applications, or conquering new frontiers on Candy Crush. When he’s not staring at his screens, he enjoys a cold beer and laughs with his family and friends. Find him at LinkedIn, Medium, and Dev.to.