Since we have written this blog post actix 0.6 was released with several breaking changes. The content of this blog post is conceptually still relevant though and the code examples should work fine if you make sure to use the correct dependency versions.
anchorThe Goal
Our goal in this blog post is to build an Actor
that connects to a certain TCP server, listens for new messages and forwards them to another recipient
actor. For simplicity we'll assume that the server is using a line break after each message.
anchorProject Setup
In our last post we explained that the easiest solution to install Rust is currently rustup.rs. Once you have followed the instructions there you should have access to the cargo
build tool on your command line.
Creating a new project for Rust is easy with cargo
:
cargo new actix-tcp-example
This will create a new binary project for you with the basic file structures already in place. The cargo new
command also supports a --lib
option, which will create a library project instead. The difference between them is that binary projects have a main()
function and get compiled to executables, while library projects are used to share code and functionality at crates.io.
Fun fact: did you know that crates.io is an open-source project itself and built with Rust and Ember.js?
Now that we have created the project let's see if it runs:
cargo run
If everything went well you should see cargo
compiling the project and afterwards running it, resulting in "Hello, world!" being printed in your Terminal.
anchorThe Actor
Before we can start to implement our TCP client actor we need to tell cargo
about the actix
dependency. For that we open the Cargo.toml
file in the project and add the following line after the [dependencies]
declaration:
actix = "0.5"
Now, whenever we build the project again, cargo
will make sure to download the necessary dependencies from crates.io and build them before building the actual project. cargo
is using a build cache though, so don't worry if it takes a long time if you first try to build it. The following builds should be much faster.
It's time to implement a basic Actor
as the base for our experiment. We don't want to worry too much about project organization for now, so let's implement it right in the src/main.rs
file:
extern crate actix;
use actix::prelude::*;
struct TcpClientActor;
impl Actor for TcpClientActor {
type Context = Context<Self>;
}
First we tell Rust that the actix
module is coming from an external crate. Next we import the most important actix
traits with the use
keyword. And finally we implement the most basic Actor
possible.
Starting cargo run
again will show us the following warning though:
warning: struct is never used: `TcpClientActor`
--> src/main.rs:5:1
|
5 | struct TcpClientActor;
| ^^^^^^^^^^^^^^^^^^^^^^
|
= note: #[warn(dead_code)] on by default
... which makes sense since we haven't actually used the actor anywhere yet. Let's change that! We'll update the main()
function with a few lines of code that should start our actor and then wait for it to shutdown at some point:
fn main() {
let sys = actix::System::new("tcp-test");
let _tcp_client: Addr<Syn, _> = Arbiter::start(|_| TcpClientActor);
sys.run();
}
Let's also add a started
hook to our actor, so that we can see when it starts:
impl Actor for TcpClientActor {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
println!("TcpClientActor started!");
}
}
If we use cargo run
again, the "TcpClientActor started!" message should appear on the screen, and the app should keep running because we have implemented nothing that would stop the actor.
First milestone achieved! 🎉
anchorDNS resolution and TCP connection
Now that we have setup the basic scaffolding of our Actor
, let's try to connect to a real TCP server. I'll first show you the code and then we'll discuss what it does step-by-step:
use actix::actors::{Connect, Connector};
// ...
fn started(&mut self, ctx: &mut Self::Context) {
println!("TcpClientActor started!");
Connector::from_registry()
.send(Connect::host("towel.blinkenlights.nl:23"))
.into_actor(self)
.map(|res, _act, ctx| match res {
Ok(stream) => {
println!("TcpClientActor connected!");
}
Err(err) => {
println!("TcpClientActor failed to connected: {}", err);
ctx.stop();
}
})
.map_err(|err, _act, ctx| {
println!("TcpClientActor failed to connected: {}", err);
ctx.stop();
})
.wait(ctx);
}
The first new thing here is Connector::from_registry()
. actix
comes with a small number of built-in actors and one of them is the so-called Connector
. The Connector
actor can be used to perform DNS lookups, or connect to remote servers via TCP, which is exactly what we want!
The Connector::from_registry()
function returns an Addr
instance for the Connector
and we can use the send()
method on it to send a Connect
message and wait for the answer (using a Future
).
Next we use the into_actor()
modifier to transform the Future
into another kind of Future
, where we get passed not only the result, but also the actor instance and the context in any callbacks that we implement. This is helpful to work around the Rust compiler complaining about lifetime issues.
Now we're ready to use the result and we do so by implementing a callback for the map()
method. We'll keep it simple for now and just print a message to the terminal whether the connection was successful or not.
Since we have to handle two different kinds of errors here (ConnectorError
and MailboxError
) we have to implement two different error handlers too. They have the same code but since the error classes are different, they can't easily share the same implementation unfortunately.
Finally, we block the current thread using the wait()
method until the Future
is resolved.
As usual, we will cargo run
again and if everything works properly we should see "TcpClientActor connected!" being printed to the Terminal.
Great! We have successfully opened up a TCP connection to the towel.blinkenlights.nl
server and if you're curious what kind of server that is: keep reading! 😉
anchorReading TCP streams
For the next part we'll need two more dependencies: tokio-io
and tokio-codec
. tokio is the low-level network IO library that actix
is using underneath.
The version of actix
that we are currently using (v0.5.8) has dependencies on v0.1 of the tokio
libraries, so to avoid unnecessary conflicts we will use the same versions. Let's add the new dependencies to the Cargo.toml
file:
tokio-codec = "0.1"
tokio-io = "0.1"
and the necessary extern crate
definitions to the top of the src/main.rs
file:
extern crate tokio_codec;
extern crate tokio_io;
We will also add the imports that we'll soon use already:
use tokio_codec::{FramedRead, LinesCodec};
use tokio_io::AsyncRead;
You may have noticed that when we ran cargo run
earlier the compiler was complaining about the stream
variable not being used. Let's fix that by reading something from the TCP stream.
A TcpStream
in the context of tokio
is the wording for the TCP connection between a client and a server, and includes both the read and write parts of the connection. Since we only care about the read part for now we should split the stream into the two parts and focus on the read part for now:
Ok(stream) => {
println!("TcpClientActor connected!");
let (r, w) = stream.split();
}
Now we have a read part (r
) that we can call read()
on and a write part (w
) that we could call write()
on. Unfortunately the Read and Write traits only operate on u8
arrays though, so for our purpose of reading strings separated by line breaks this isn't quite a user friendly as we would like.
Fortunately, tokio
has a solution for that:
let line_reader = FramedRead::new(r, LinesCodec::new());
This combination of LinesCodec
and FramedRead
from the tokio-codecs
crate can be used to work with the Read
trait in a more comfortable way. The FramedRead
instance acts as an asynchronous stream of values that we could call poll()
on, or we'll attach it to the actor in a more convenient way by implementing the StreamHandler
trait from actix
:
impl StreamHandler<String, std::io::Error> for TcpClientActor {
fn handle(&mut self, line: String, _ctx: &mut Self::Context) {
println!("{}", line);
}
}
and then connecting all the pieces together via:
ctx.add_stream(line_reader);
right below the line that constructs the FramedRead
instance.
I won't spoiler anything, but take a bit of time and see what happens now if you start cargo run
again... 🚀
anchorForward messages to other actors
In case you're still reading, let's implement the final part of our goal: forwarding received messages to another actor.
Before we can forward any messages we first need to define how such a message will look like. As explained in the last blog post about actix we can do so by implementing the Message
trait, or deriving it automatically in most cases:
#[derive(Message)]
pub struct ReceivedLine {
pub line: String,
}
Now that we know how the message looks like we should implement a second actor that can receive such messages and print them to the terminal:
pub struct ConsoleLogger;
impl Actor for ConsoleLogger {
type Context = Context<Self>;
}
impl Handler<ReceivedLine> for ConsoleLogger {
type Result = ();
fn handle(&mut self, message: ReceivedLine, _ctx: &mut Context<Self>) {
println!("{}", message.line);
}
}
At this point the above implementation should contain no surprises anymore. The ConsoleLogger
actor implementation looks roughly like the other actors we have already implemented in this and the previous blog post. All it does is listen for ReceivedLine
messages, and print them to the terminal once received.
Next, we will need to adjust our TcpClientActor
implementation to no longer print messages by itself, but instead forward them to the second actor. For that the TcpClientActor
needs to know where to send them. We could save the Addr<_, ConsoleLogger>
instance in the TcpClientActor
struct, but that would create a tight coupling between those actors, and we want the TcpClientActor
implementation to be as generic as possible.
The more generic solution is to use the Recipient
struct of actix
:
struct TcpClientActor {
recipient: Recipient<Syn, ReceivedLine>,
}
Afterwards we change the StreamHandler
implementation to this:
impl StreamHandler<String, io::Error> for TcpClientActor {
fn handle(&mut self, line: String, _ctx: &mut Self::Context) {
if let Err(error) = self.recipient.do_send(ReceivedLine { line }) {
println!("do_send failed: {}", error);
}
}
}
This implementation will try to send a ReceivedLine
message to the recipient
and print an error to the terminal if it fails.
The final missing piece now is to start the ConsoleLogger
actor in the main()
function and pass its address to the TcpClientActor
:
fn main() {
let sys = actix::System::new("tcp-test");
let _logger: Addr<Syn, _> = Arbiter::start(|_| ConsoleLogger);
let _tcp_client: Addr<Syn, _> = Arbiter::start(|_| {
TcpClientActor { recipient: _logger.recipient() }
});
sys.run();
}
After everything is assembled back together let's use cargo run
again and we will see that everything still works! 🎥
anchorSummary
We hope that by now you're a little more comfortable around actors and how to use and implement them in Rust. This blog post has shown how two actors can work together without any tight coupling other than the messages they exchange. We have also demonstrated how to use an actor as a TCP client, which can be used as a gateway for other actors that want to share the same connection to the server. In the next blog post of this series we will explore how to send messages and keep connections and actors alive using the Supervisor
.
👋