Перейти к основному содержимому

Чат в терминале на Rust

· 39 мин. чтения
Иван Давыдов

Эта статья — туториал по написанию небольшого чат сервиса (серверное и клиентское приложения) на Rust, используя функционал TCP сокетов из стандартной библиотеки Rust. Сам чат для пользователя будет выглядеть, как приложение в терминале. Полный код приложений есть в гитхабе.

demo.gif

к сведению

Ссылка на статью на хабре – https://habr.com/ru/articles/728870/

Начало

подсказка

Много объяснений будет записано в качестве комментариев к коду.

У нас будет 2 приложения: сервер, который будет принимать сообщения и раздавать их всем пользователям, подключённым к чату, и клиент, который будет показывать юзеру сообщения полученные от сервера и отправлять серверу сообщения от юзера. Создать шаблон для этих приложений можно через cargo new <name>. После этого нашим приложениям надо прописать базовые состояния и типы, которые они будут использовать на протяжении всей свой работы.

Для сервера начнём со структуры Settings. Она будет парсить и сохранять аргументы пользователя при запуске программы. Для парсинга будет использоваться clap.

Код структуры Settings на сервере (Файл server/src/settings.rs)
server/src/settings.rs
// Импортирование нужного трейта из clap
use clap::Parser;

// Объявление того, что будет парсится в качестве аргументов.
// С помощью derive макроса можно навесить на структуру макрос импортированного
// трейта в качестве атрибута этой структуры. В нашем случае таким образом
// будет сгенерирован нужный impl c функционалом Parser'а для структуры Args.
#[derive(Parser)]
pub struct Args {
// Макрос arg так же импортируется из clap автоматически
// и позволяет объявить поле аргументом и задать ему нужные свойства.
// short означает, что аргумент можно будет вписать сокращённо
// вот так "-p 8080". long, что можно использовать
// полное название "--port 8080". А help это просто вспомогательный
// текст, который будет показываться при запуске приложения с --help
#[arg(short, long, help = "Port that the server will serve")]
pub port: u16,
}

// Трейт Debug позволяет удобно выводить структуру через print в консоль, а
// Clone добавляет функционал для клонирования инстансов структуры.
// В derive мы опять передаём макросы этих трейтов чтобы они сгенерировали нам
// impl'ы с реализациями Debug и Clone, чтобы вручную не писать это.
#[derive(Debug, Clone)]
pub struct Settings {
pub port: u16,
}

// Внутри impl прописываются методы структуры
impl Settings {
pub fn new() -> Settings {
// используем метод от трейта Parser
let args = Args::parse();

// Создаём инстанс структуры Settings и возвращаем его
Settings {
port: args.port,
}
}
}

Добавим создание объекта Settings в наш main.rs. После этого при запуске приложения будут запрашиваться аргументы, а нашем случае — порт сервера.

Код server/src/main.rs
server/src/main.rs
// anyhow это небольшая библиотека, которая добавляет enum Result, 
// почти аналогичный Result'у из std, с единственным отличием, что этот
// может принимать любую ошибку
use anyhow::Result;

// Импортирование нашей новой структуры
use settings::Settings;

// Обязательное указание модуля, иначе файл виден не будет
mod settings;

fn main() -> Result<()> {
// Создание инстанса нстроек
let settings = Settings::new();

// возвращение Result::Ok() значения
Ok(())
}

Следующее, что нужно сделать это структуру состояния (State) нашего серверного приложения. Так как сервер, будет работать сразу в несколько потоков, то и состояние должно поддерживать многопоточность. Для этого внутри структуры данные завёрнуты в Arc и Mutex, подробнее в коде.

Код структуры State на сервере (Файл server/src/state.rs)
server/src/state.rs
use std::{
// Arc (Atomic Reference Counter) это smart pointer, который реализует
// множественное владение переменной. То-есть, грубо говоря, данные на
// которые указывает Arc не исчезнут пока есть
// хотя бы один клон этого Arc'а. По сути, то же самое делает и
// Rc (Reference Counter), но Rc не поддерживает многопоточность.
sync::Arc,
collections::HashMap
};

// Это аналог Mutex'а из стандартной библиотеки, но работающий намного быстрее.
// Сам Mutex это структура, которая блокируется для доступа из других потоков,
// если в одном из них она уже используется. И соответственно после использования
// она становится доступна для других потоков. Это нужно для того чтобы не было
// рассинхрона данных между потоками.
use parking_lot::{Mutex, MutexGuard};

use crate::settings::Settings;

// Каждый юзер после подключения будет записываться в стейт,
// структура UserData описывает, что будет хранить в себе запись
// о подключенном юзере.
#[derive(Debug, Clone)]
pub struct UserData {
// Ip адрес подключённого пользователя + его сокета
pub address: String,
}

#[derive(Debug, Clone)]
pub struct StateData {
// Настройки приложения, которые мы описали ранее
pub settings: Settings,
// HashMap'а хранящая данные о подключённых юзерах, где ключ это никнейм,
// значение это UserData
pub users: HashMap<String, UserData>,
}

// Arc, как я писал выше реализует множественно владение данными, но он не
// позволяет эти данные менять. Для этого чтобы это было возможно и безопасно мы
// дополнительно оборачиваем StateData в Mutex.
pub struct State(Arc<Mutex<StateData>>);

impl State {
pub fn new(settings: Settings) -> State {
State(
Arc::new(Mutex::new(StateData {
settings,
users: HashMap::new()
}))
)
}

// Метод для упрощения доступа к данным. Он блокирует Mutex для работы с
// данными только в текущем потоке. И возвращает MutexGuard. Пока MutexGuard
// жив другие потоки не смогут заблокировать данные для себя.
pub fn get(&self) -> MutexGuard<StateData> {
self.0.lock()
}
}

// Реализация трейта Clone для State. Просто повесить макрос трейта Clone
// через derive не получится, потому что копировать нужно внутренний Arc.
// Поэтому необходимые для Clone методы реализуем вручную.
impl Clone for State {
fn clone(&self) -> Self {
State(Arc::clone(&self.0))
}

fn clone_from(&mut self, source: &Self) {
*self = source.clone();
}
}

Теперь так же перенесём State в нашу main функцию.

Обновлённый код функции main для сервера (Файл server/src/main.rs)
server/src/main.rs
use anyhow::Result;

use settings::Settings;
use state::State; // +

mod settings;
mod state; // +

fn main() -> Result<()> {
let settings = Settings::new();
let state = State::new(settings)); // +

Ok(())
}

Для серверного приложения состояние и базовые параметры готовы, тоже самое нужно прописать для клиента.

Код структуры Settings для клиента (Файл client/src/settings.rs)
client/src/settings.rs
use clap::Parser;

#[derive(Parser)]
pub struct Args {
// Адрес сервера с портом, к которому будет производится подключение
#[arg(short, long, help = "Server address")]
pub address: String,
}

#[derive(Debug, Clone)]
pub struct Settings {
pub server_address: String,
}

impl Settings {
pub fn new() -> Settings {
let args = Args::parse();

Settings {
server_address: args.address
}
}
}

State для клиента немного отличается, но суть та же. Структура, чтобы хранить состояние приложения, с возможностью раздачи его на несколько потоков.

Код структуры State для клиента (Файл client/src/state.rs)
client/src/state.rs

use std::{
sync::{
// mpsc нужно для передачи сообщений по каналу между несколькими потоками.
// В нашем случае будут два потока (главный и созданный), один из которых
// будет передавать второму сигналы по каналу mpsc.
mpsc::{
Sender,
Receiver,
self
},
Arc
},
io::{
self,
// BufReader будем использовать для чтения данных с tcp сокета.
// Он работает по такому принципу: делает редкие, но объемные read
// запросы по файл дескриптору и далее мы можем удобно, что он прочитал.
// Для чтения строк из tcp сокета это очень хорошо подходит.
BufReader,

// Два трейта. Один для чтения из BufReader'а, другой для записи в файл
// (в нашем случае в сокет).
BufRead,
Write
}
};

use parking_lot::Mutex;

pub struct State {
// Ник, который юзер введёт при запуске приложения
pub username: String,

// Принимающая часть канала mpsc. В качестве типа передаваемых данных
// указан unit (пустой tuple), так как нам нужен будет сам факт наличия
// нового сообщения, его внутренности интересовать не будут.
// Указывается как Option, потому что в будет передана другому потоку и
// после этого доступна не будет и тут будет храниться None.
pub chat_reload_receiver: Option<Receiver<()>>,

// Часть канала mpsc, которая отправляет информацию принимающему потоку.
pub chat_reload_sender: Sender<()>,

// В user_input'е будет лежать текущий ввод пользователя. Пример:
// юзер пишет "привет", но не отправляет его в чат. "приве" лежит
// в user_input'е. Обычно такая реализация не требуется, но у нас часто будет
// полностью перерисовываться чат, и при этом будет пропадать дефолтный
// ввод юзера. Поэтому чтобы это ввод не исчезал, приходится хранить его
// отдельно. Подробнее об этом будет позже, когда перейдём к месту
// реализации ввода сообщения.
pub user_input: Arc<Mutex<String>>,

// Массив, полученных с сервера сообщений.
pub messages: Arc<Mutex<Vec<String>>>
}

impl State {
pub fn new() -> io::Result<State> {
// Создание mpsc канала. Так как функция вернёт tuple, его можно
// сразу разбить на две переменные
let (sx, rx) = mpsc::channel::<()>();

let user_input = Arc::new(Mutex::new(String::new()));
let messages = Arc::new(Mutex::new(Vec::<String>::new()));

let mut instance = State {
username: String::new(),
chat_reload_receiver: Some(rx),
chat_reload_sender: sx,
user_input,
messages,
};

// Вызов метода для получения username'а
instance.read_username()?;

Ok(instance)
}

// Метод, который запрашивает у user'а ввод его ника и
// записывает полученные данные в state.
fn read_username(&mut self) -> io::Result<()> {
// Для некоторых манипуляций с терминалом, будем использовать termion.
// Библиотека позволяет "стирать" все из терминала, красить текст,
// менять режим у stdout'а (об этом позже) и тд.
// В данном случае нам нужно очистить терминал.
println!("{}", termion::clear::All);
print!("Username: ");

// Макрос print! добавляет в буфер текст, но не выполняет flush
// и из-за этого после простого выполнения print! в консоли вы
// ничего не увидите. Чтобы это исправить нужно вызвать flush вручную.
std::io::stdout().flush()?;

let mut username = String::new();

// Чтение строки из stdin и запись содержимого в username
// через передачу мутабельной ссылки на username в read_line.
io::stdin().read_line(&mut username)?;

// Обрезаем с начала и конца ненужные символы
// (пробелы, перенос строки и тд) и записываем в наш объект State.
self.username = username.trim().to_owned();

// Снова всё очищаем.
println!("{}", termion::clear::All);

Ok(())
}
}
Код функции main для клиента (Файл client/src/main.rs)
client/src/main.rs
use std::io;

use crate::{
settings::Settings,
state::State
};

mod settings;
mod state;

fn main() -> io::Result<()> {
let settings = Settings::new();
let state = State::new()?;

Ok(())
}

Прописывание общих типов для клиента и сервера

Теперь когда у нас готовы базовые вещи, можно начинать делать логику.

И так, у нас сервер и клиент будут передавать друг другу сообщения в одном и том же формате. Эти сообщения называются “сигналы”. Сам формат сигналов похож на формат передаваемых данных в http, только очень сильно упрощен.

В начале сигнала идут хедеры (список ниже). Хедеры разделяются символами “\r\n”.

/*
Кто отправляет Поле Значение

USER USERNAME Строка
SERVER AUTH_STATUS "ACCEPTED", "DECLINED"
USER+SERVER WITH_MESSAGE Нет
USER+SERVER SIGNAL_TYPE "CONNECTION", "NEW_MESSAGE"
SERVER SERVER_MESSAGE Нет
*/

/*
Определения хедеров

USERNAME Имя пользователя, от которого пришло сообщение
AUTH_STATUS Статус авторизации
WITH_MESSAGE В сигнале есть сообщение
SIGNAL_TYPE Тип сигнала: запрос на авторизацию или сообщение
SERVER_MESSAGE Серверное сообщение
*/

Потом в случае если в хедерах сигнала есть “WITH_MESSAGE”, то после хедеров идет ещё один разделитель “\r\n” и начинается сообщение, которое заканчивается символами “\r\n\r\n”.

Нам нужно уметь парсить сигналы и легко формировать свои. Для этого нужно прописать ряд типов, которые будут иметь вспомогательные методы, которыми мы будем пользоваться.

Перед этим создадим нашу кастомную ошибку, которую мы будем отдавать при возникновении проблем с парсингом.

Код кастомной ошибки парсинга (начало файла с типами <client & server>/src/types.rs)
<client & server>/src/types.rs
use std::{
// Импорт утилит для форматирования и вывода строк
fmt,

// Импорт трейта Error (все ошибки как правило должны его имлементить)
error::Error
};

// Трейт Debug обязателен для Error, поэтому навешиваем
// макрос Debug на нашу структуру.
#[derive(Debug)]
pub struct ParseSignalDataError;

// impl Error для структуры. Внутри при желании можно не
// реализовывать методы, потому что все они реализованы по умолчанию.
impl Error for ParseSignalDataError {}

// Error так же требует реализации трейта fmt::Display
impl fmt::Display for ParseSignalDataError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "invalid signal data")
}
}

Можно начинать постепенно реализовывать нужные типы. Первым из них будет простенькие enum SignalType. Он описывает 2 варианта возможных типов сигнала: Connection (подключение, отправляется клиентом серверу), NewMessage (новое сообщение, клиент и сервер отправляют их друг другу).

Код enum’а SignalType (продолжение файла с типами <client & server>/src/types.rs)
<client & server>/src/types.rs
// ...

#[derive(Debug, Clone, Copy)]
pub enum SignalType {
Connection,
NewMessage,
}

// Трейт FromStr идет из стандартной библиотеки и добавляет функцию
// для создания нужного типа из строки.
impl FromStr for SignalType {
type Err = ParseSignalDataError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"CONNECTION" => Ok(SignalType::Connection),
"NEW_MESSAGE" => Ok(SignalType::NewMessage),
_ => Err(ParseSignalDataError)
}
}
}

// Трейт ToString также идёт из стандартной библиотеки и добавляет функцию
// для создания уже строки из типа.
impl ToString for SignalType {
fn to_string(&self) -> String {
match self {
// .to_owned() делает из заимствованного типа (ссылки), владеющий.
// В данном случае делает из &str String. Подробно об этом
// останавливаться не буду. Лучше отдельно почитать статьи про
// ownership модель в Rust.
SignalType::Connection => "CONNECTION".to_owned(),
SignalType::NewMessage => "NEW_MESSAGE".to_owned(),
}
}
}

Далее идёт ещё один простой enum AuthStatus. Он содержит значения, которые возвращает сервер в ответ на попытку авторизации юзером. ACCEPTED — авторизация прошла успешно, DENIED — авторизация отклонена. После DENIED соединение сбрасывается.

Код enum’а AuthStatus (продолжение файла с типами <client & server>/src/types.rs)
<client & server>/src/types.rs
// ...

#[derive(Debug, Clone, Copy)]
pub enum AuthStatus {
ACCEPTED,
DENIED
}

impl FromStr for AuthStatus {
type Err = ParseSignalDataError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"ACCEPTED" => Ok(AuthStatus::ACCEPTED),
"DENIED" => Ok(AuthStatus::DENIED),
_ => Err(ParseSignalDataError)
}
}
}

impl ToString for AuthStatus {
fn to_string(&self) -> String {
match self {
AuthStatus::ACCEPTED => "ACCEPTED".to_owned(),
AuthStatus::DENIED => "DENIED".to_owned()
}
}
}

Теперь нужно реализовать enum SignalHeader. Это уже тип поинтереснее, он содержит хедеры сигнала и значения, которые они передают. Подробнее про то, какие есть хедеры и какие значения имеют я писал выше, поэтому на этом не буду заострять особо внимание.

Код enum’а SignalHeader (продолжение файла с типами <client & server>/src/types.rs)
<client & server>/src/types.rs
// ...

pub enum SignalHeader {
Username(String),
AuthStatus(AuthStatus),
SignalType(SignalType),
WithMessage,
ServerMessage
}

impl FromStr for SignalHeader {
type Err = ParseSignalDataError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let (header, value) = s.split_once(':').unwrap_or((s, s));

match header {
"USERNAME" => Ok(SignalHeader::Username(value.trim().to_owned())),
"AUTH_STATUS" => {
match AuthStatus::from_str(value.trim()) {
Ok(v) => return Ok(SignalHeader::AuthStatus(v)),
Err(_) => Err(ParseSignalDataError)
}
},
"SIGNAL_TYPE" => {
match SignalType::from_str(value.trim()) {
Ok(v) => return Ok(SignalHeader::SignalType(v)),
Err(_) => Err(ParseSignalDataError)
}
}
"WITH_MESSAGE" => Ok(SignalHeader::WithMessage),
"SERVER_MESSAGE" => Ok(SignalHeader::ServerMessage),
_ => Err(ParseSignalDataError)
}
}
}

impl ToString for SignalHeader {
fn to_string(&self) -> String {
match self {
SignalHeader::Username(v) => format!("USERNAME: {v}\r\n"),
SignalHeader::AuthStatus(v) => format!("AUTH_STATUS: {}\r\n", v.to_string()),
SignalHeader::SignalType(v) => format!("SIGNAL_TYPE: {}\r\n", v.to_string()),
SignalHeader::WithMessage => "WITH_MESSAGE\r\n".to_owned(),
SignalHeader::ServerMessage => "SERVER_MESSAGE\r\n".to_owned()
}
}
}

Осталось создать самую важную для типов структуры — структуру, которая будет формировать и парсить сигнал. Она самая объёмная в плане кода в файле types.rs. Хотя сам код конечно тривиальный.

Код структуры SignalData (конец файла с типами <client & server>/src/types.rs)
<client & server>/src/types.rs
// ...

#[derive(Debug, Clone)]
pub struct SignalData {
pub username: Option<String>,
pub password: Option<String>,
pub key: Option<String>,
pub auth_status: Option<AuthStatus>,
pub signal_type: Option<SignalType>,
pub with_message: bool,
pub message: Option<String>,
pub server_message: bool
}

impl SignalData {
// Метод создания нового инстанса SignalData. В него передаётся
// массив хедеров, который будет иметь сигнал и сообщение.
// Так как сообщения может не быть, то типом сообщения является
// enum Option. Этот enum идет с растом из коробки и имеет два значения
// Some(v) и None.
pub fn new(headers: Vec<SignalHeader>, message: Option<&str>) -> SignalData {
let mut data = SignalData {
username: None,
password: None,
key: None,
auth_status: None,
signal_type: None,
with_message: false,
message: None,
server_message: false
};

for header in headers {
match header {
SignalHeader::Username(v) => {
data.username = Some(v);
},
SignalHeader::AuthStatus(v) => {
data.auth_status = Some(v);
},
SignalHeader::SignalType(v) => {
data.signal_type = Some(v);
},
SignalHeader::WithMessage => {
data.with_message = true;
data.message = Some(message.unwrap_or("").to_owned());
},
SignalHeader::ServerMessage => {
data.server_message = true;
}
}
}

data
}
}

impl FromStr for SignalData {
type Err = ParseSignalDataError;

// Метод для парсинга данных сигнала.
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut data = SignalData {
username: None,
password: None,
key: None,
auth_status: None,
signal_type: None,
with_message: false,
message: None,
server_message: false,
};
let splitted = s.split("\r\n");
for string in splitted {
let header = match SignalHeader::from_str(string) {
Ok(v) => v,
Err(_) => continue
};

match header {
SignalHeader::Username(v) => {
data.username = Some(v);
},
SignalHeader::AuthStatus(v) => {
data.auth_status = Some(v);
},
SignalHeader::SignalType(v) => {
data.signal_type = Some(v);
}
SignalHeader::WithMessage => {
data.with_message = true;
},
SignalHeader::ServerMessage => {
data.server_message = true;
}
}
}

if data.with_message {
let splitted = s.split_once("\r\n\r\n");
if let Some(v) = splitted {
if v.1.ends_with("\r\n\r\n") {
let string = v.1.to_owned();
data.message = Some(string[..string.len() - 4].to_owned());
}
else {
data.message = Some(v.1.to_owned());
}
}
else {
return Err(ParseSignalDataError);
}
}

if let None = data.signal_type {
return Err(ParseSignalDataError)
}

Ok(data)
}
}

impl ToString for SignalData {
// Метод для преобразования объекта сигнала в строку, для
// отправки клиенту/серверу.
fn to_string(&self) -> String {
let mut res_str = String::new();

if let Some(v) = &self.username {
res_str.push_str(&SignalHeader::Username(v.to_owned()).to_string());
}
if let Some(v) = &self.auth_status {
res_str.push_str(&SignalHeader::AuthStatus(v.clone()).to_string());
}
if let Some(v) = &self.signal_type {
res_str.push_str(&SignalHeader::SignalType(v.clone()).to_string());
}
if self.server_message {
res_str.push_str(&SignalHeader::ServerMessage.to_string());
}
if self.with_message {
if let Some(v) = &self.message {
res_str.push_str(&SignalHeader::WithMessage.to_string());
res_str.push_str("\r\n");
res_str.push_str(&v);
}
}
res_str.push_str("\r\n\r\n");

res_str
}
}

Принятие запросов от клиентов сервером

Для обмена сигналами клиенту и серверу нужно установить соединение. Для этого мы будем использовать TCP сокеты. Сервер при запуске будет открывать один listening (прослушивающий) сокет (сокет, который слушает запросы на соединение), а клиент будет запрашивать подключение к этому сокету.

Вообщем, нам нужно сначала, чтобы сервер создавал при запуске прослушивающий сокет. Для этого создадим отдельную пустую структуру Service, в которой у нас будет единственный метод, отвечающий за запуск нашего сервиса. Метод будет принимать в себя аргументом state, который мы уже формируем при запуске. Оттуда он возьмёт для себя всё необходимое (пока что это только порт, который привяжется к прослушивающему сокету).

Код структуры Service для сервера (Файл server/src/service.rs)
server/src/service.rs
use std::{net::TcpListener, thread, sync::Arc};
use anyhow::Result;
use parking_lot::Mutex;

use crate::state::State;

pub struct Service;

impl Service {
pub fn run(state: State) -> Result<()> {
// Метод bind у TcpListener создаст tcp сокет, привяжет к нему
// адрес, который мы передали, и поставит его в режим прослушивания.
let listener = TcpListener::bind(format!("0.0.0.0:{}", state.get().settings.port))?;

// Метод incoming возвращает итератор, который на каждую
// следующую итерацию вызывает у tcpListener'а метод .accept().
// Этот метод блокирует текущий поток и ждёт пока не появится
// соединение, которое он сможет принять. Поэтому цикл
// не умрет, пока жив tcpListener.
for con in listener.incoming() {

// Для оптимизации работы с подключениями, каждое из них мы будем
// обрабатывать в отдельном потоке. Важно помнить, что переменные,
// которые используются в потоке, перемещаются в него и обратно не
// возвращаются)) По крайней мере в нашем случае.
thread::spawn(move || -> Result<()> {

// Пока что нам с самим соединением делать нечего, так что
// давайте просто выведем адрес подключенного сокета клиента.

// "?" нужен тут чтобы достать из enum'а Result успешный результат.
// Так как con это enum Result<TcpStream, Error>, а он содержит
// значения Ok(v) и Err(e), то "?" вернёт нам значение из Ok,
// то-есть TcpStream. В случае если con это Err, то
// ошибка передастся вверх функции родителю.
println!("{:?}", con?.peer_addr());

Ok(())
});
}

Ok(())
}
}

Осталось вызвать метод run у сервиса в функции main.

Обновленный код функции main для сервера (Файл server/src/main.rs)
server/src/main.rs
use anyhow::Result;

use service::Service;
use settings::Settings;
use state::State;

mod settings;
mod state;
mod service;

fn main() -> Result<()> {
let settings = Settings::new();
let state = State::new(settings);


Service::run(state)?; // +

Ok(())
}

Теперь наконец-то можно запустить сервер и увидеть что-то интересное. Впишите в терминале команду cargo run -- --port 8080 (--port это аргумент, который мы определяли в структуре Settings). Дальше попробуйте отправить на этот порт запрос через curl (команда curl 127.0.0.1:8080) и вы должны увидеть в выводе сервера нечто подобное.

вывод сервера

Он будет выводить вам адреса TCP сокетов клиентов. Так как у любого TCP сокета должен быть какой-то порт, в том числе у того, который производить запрос на подключение, система сама автоматически выдаст ему свободный порт. Поэтому в выводе вы будете видеть локалхост в связке со случайными портами.

Подключение клиента к серверу

Подключение к серверу на клиенте будет завернуто в структуру Connection. В ней будет храниться сам TcpStream (сокет подключённый к серверу) и BufReader, с помощью которого будем читать данные из сокета. Про него я немного рассказал выше в описание кода структуры State для клиента.

Connection будет выполнять 2 функции: подключение к серверу с отправлением всех нужных данных (юзернейма и типа сигнала) и чтение данных из сокета. Для этого будут реализованы методы new и read_signal соответственно.

Код структуры Connection для клиента (Файл client/src/connection.rs)
client/src/connection.rs

use std::{
net::TcpStream,
io::{
self,
Write,
Error,
ErrorKind,
BufRead,
BufReader
},
};

use crate::types::{
SignalType,
SignalHeader,
SignalData,
AuthStatus
};

pub struct Connection {
pub stream: TcpStream,

// инстанс BufReader'а нужен только для внутреннего пользования,
// поэтому делать его публичным нет смысла
reader: io::BufReader<TcpStream>
}

impl Connection {
pub fn new(address: &str, username: &str) -> io::Result<Connection> {
// Формирование сигнала. Передаётся тип сигнала и имя пользователя,
// который подключается.
let signal = SignalData::new(
vec![
SignalHeader::SignalType(SignalType::Connection),
SignalHeader::Username(username.to_owned())
],
None
);

// Метод connect у TcpStream создаст TCP сокет, и
// попытается подключиться к сокету, адрес которого
// был передан аргументом.
let mut connection = TcpStream::connect(address)?;

// Метод write_all запишет байты в сокет и они будут
// переданы сокету на другом конце, то-есть серверу.
connection.write_all(signal.to_string().as_bytes())?;

// Создаём инстанс BufReader'а, который будет читать данные с сокета.
let reader = BufReader::new(connection.try_clone()?);

// Создаём инстанс Connection
let mut instance = Connection {
stream: connection,
reader
};

// С сервера должен прийти AuthStatus. Ждём его и когда он приходит
// в зависимости от статуса, либо отдаём ошибку, либо отдаём
// инстанс Connection.
let data_from_socket = instance.read_signal()?;
if data_from_socket.contains(&AuthStatus::DENIED.to_string()) {
return Err(Error::new(ErrorKind::ConnectionAborted, "Access denied"));
}

return Ok(instance)
}

pub fn read_signal(&mut self) -> io::Result<String> {
let mut res_line = String::new();

// Индикатор того, что хедеры были прочитаны
let mut headers_read = false;
loop {
let mut buf_line = String::new();

// Ридер читает по одной строчке и записывает в буферную переменную.
// Если находит ошибку, то всё вылетает. Если он читает 0 байт, то
// значит, что соединение потеряно (специфика чтения сокетов).
match self.reader.read_line(&mut buf_line) {
Err(e) => panic!("Got an error: {}", e),
Ok(0) => return Err(Error::new(ErrorKind::BrokenPipe, "Connection closed")),
Ok(_) => (),
};
res_line.push_str(&buf_line);

// Если он натыкается на "\r\n\r\n", то значит хедеры закончились
// и если нет хедера WITH_MESSAGE, то сигнал можно считать прочитанным
// и возвращать его. Если есть, то цикл уходить на второй круг
// и читает сообщение.
if res_line.ends_with("\r\n\r\n"){
if !res_line.contains(&SignalHeader::WithMessage.to_string()) || headers_read {
break;
}
headers_read = true;
}
}

Ok(res_line)
}
}

// Реализация трейта Clone для Connection. Просто навесить
// макрос для Clone не получится, потому что нужно чуть иначе клонировать
// TcpStream и BufReader.
impl Clone for Connection {
fn clone(&self) -> Self {
Connection {
stream: self.stream.try_clone().unwrap(),
reader: BufReader::new(self.stream.try_clone().unwrap())
}
}
}

Основная часть логики работы клиента будет описываться в структуре Service. В нашем серверном приложении тоже есть такая структура, но там на неё ложиться ответственность только за запуск приложения. В принципе на данном этапе в клиенте нам тоже требуется только описать запуск.

Начало файла client/src/service.rs
client/src/service.rs
use std::io;
use crate::{
settings::Settings,
state::State,
connection::Connection
};

pub struct Service {
pub connection: Connection,
pub settings: Settings,
pub state: State,
}

impl Service {
pub fn run(settings: Settings, state: State) -> io::Result<()> {
let connection = Connection::new(
&settings.server_address.to_owned(),
&state.username
)?;

let mut instance = Service {
connection,
settings,
state
};

Ok(())
}
}

Теперь добавим вызов нашего нового метода в main функцию клиента. Пока при запуске приложения он просто подключится к серверу и сразу закроется. Более никаких изменений в main’е клиента не будет.

Код main функции клиента (Файл client/src/main.rs)
client/src/main.rs
use std::io;

use service::Service;

use crate::{
settings::Settings,
state::State
};

mod settings;
mod types;
mod connection;
mod state;
mod service;

fn main() -> io::Result<()> {
let settings = Settings::new();
let state = State::new()?;

Service::run(settings, state)?;
Ok(())
}

Пул сообщений на сервере (MessagesPool)

Наш сервер будет работать с подключениями в отдельных потоках. Для каждого пользователя будет создано 2 отдельных потока: на принятие сообщений от него и на отправку сообщений ему от других пользователей. Новые сообщения необходимо отправлять всем пользователям чата, то-есть потоки, пользователей, которые отвечают за отправку им новых сообщений должны получить каким-то образом сигнал о том, что в чате появилось новое сообщение и его надо отправить юзеру.

Для решения этой проблемы я решил создавать на время жизни процесса сервера структуру, которая будет хранить в себе списком 256 последних сообщений. Потоки отправляющие сообщения юзеру будут с определённым интервалом читать сообщения из этой структуры, начиная с последнего прочитанного. А в свою очередь потоки принимающие сообщения от пользователей будут в конец списка добавлять новые сообщения.

Опишем тип самого сообщения, которое будет храниться в пуле.

Начало файла server/src/messages_pool.rs
server/src/messages_pool.rs
use std::{collections::{HashMap, VecDeque}, iter};

#[derive(Debug, Clone)]
pub struct PoolMessage {
// Уникальный идентификатор сообщения
pub id: String,

pub username: String,
pub message: String,

// Сообщениям, отправленным сервером ставится true. Это сообщения
// о входе и выходе пользователя.
pub from_server: bool,
}

impl PoolMessage {
fn new() -> PoolMessage {
PoolMessage {
id: String::new(),
username: String::new(),
message: String::new(),
from_server: false,
}
}
}

Далее приступим к описанию самого пула. Простыми словами пул - это просто массив с ограниченной длинной (256 сообщений). Каждое новое сообщения будет добавляться в конец, а если сообщений в пуле 256, то первое сообщение будет удалено для того чтобы в конце освободилось место для нового.

Продолжение файла server/src/messages_pool.rs
server/src/messages_pool.rs
//...

pub struct MessagesPool {
// Вектор с сообщениям. Используется VecDeque, а не Vec
// для того чтобы можно было удалять и добавлять сообщения
// и в конец и в начало вектора.
pool: VecDeque<PoolMessage>,

// Для упрощения вычислений индексы сообщений в векторе хранятся
// в отдельной таблице.
indexes: HashMap<String, u8>,

// Кол-во сообщений в пуле.
length: u16,
}

impl MessagesPool {
pub fn new() -> MessagesPool {
// Создём вектор на 256 элементов. Метод repeat_with создас бесконечный
// итератор, который будет повторять одни и теже данные. Далее методом
// take можно преобразовать этот итератор в итератор поменьше. И
// collect'ом собираем итератор в коллекцию (в VecDeque в нашем случае).
let arr: VecDeque<PoolMessage> = iter::repeat_with(|| PoolMessage::new())
.take(256)
.collect();

MessagesPool {
pool: arr,
indexes: HashMap::new(),
length: 0
}
}

// Метод push добавляет новое сообщение в пул, обновляет
// внутренние индексы и увеличивает кол-во сообщений. Если
// кол-во сообщений в пуле больше 256, то сообщение на нулевом индексе
// удаляется и в конец записывается новое.
pub fn push(&mut self, v: PoolMessage) {
if self.length == 256 {

// обновляем пул
self.pool.pop_front();
self.pool.push_back(v);

// обновляем индексы
let mut new_indexes: HashMap<String, u8> = HashMap::new();

// enumerate - метод итератора, позволяющий преобразовать итератор
// в итератор, который вместе с текущим значением, отдаёт
// и текущую итерацию.
for (index, message) in self.pool.iter().enumerate() {
new_indexes.insert(message.id.clone(), index as u8);
}
self.indexes = new_indexes;
}
else {
// "as u8" преобразует u16 в u8. Мы не можем поставить полю
// length u8, потому что длина может быть равна 256, а оно
// не входит в u8.
let index = self.length as u8;
self.pool[index as usize] = v.clone();
self.length += 1;
self.indexes.insert(v.id.clone(), index);
}
}

// Возвращает сообщения, начиная с определённого id
// и id последнего сообщения.
fn read_from(&self, id: &str) -> (Vec<PoolMessage>, Option<String>) {
let found_index = self.indexes.get(id);

// Проверяет найдено сообщение с таким id в индексах или нет.
// Если нет, значит в один момент появилось много новых сообщений и их
// не успели отправить пользователю. В таком случае читаем
// сообщения с начала списка.
match found_index {
Some(v) => {
let index: u16 = v.to_owned() as u16 + 1;
let sliced_pool = &Vec::from(self.pool.clone())[index.into()..self.length.into()];
let sliced_pool_last = {
if sliced_pool.len() == 0 {
None
}
else {
Some(sliced_pool.last().unwrap().clone().id)
}
};
return (sliced_pool.clone().into(), sliced_pool_last)
},
None => {
let last_el = self.last();
let index = match last_el {
Some(v) => Some(v.id.clone()),
None => None
};
let sliced_pool = &Vec::from(self.pool.clone())[..self.length.into()];
return (sliced_pool.into(), index)
}
}
}

// Метод проверяющий пул на наличие новых сообщений и возвращает их.
pub fn has_new(&self, id: &str) -> Option<(Vec<PoolMessage>, Option<String>)> {
let last_el = self.last();
match last_el {
Some(_) => Some(self.read_from(id)),
None => None,
}
}

// Возвращает последнее сообщение в соответствии с текущей длиной пула.
fn last(&self) -> Option<PoolMessage> {
let last_index = {
if self.length > 0 {
self.length - 1
} else {
self.length
}
};

let last_el = &self.pool[last_index.into()];
if last_el.id == "".to_owned() {
None
} else {
Some(last_el.to_owned())
}
}
}

Чтение сигналов от пользователя на сервере

Чтение сигналов от клиента на сервере будет аналогично чтению сигналов от сервера на клиенте, которое мы писали для структуры Connection. Единственное отличие, что привязываться метод чтения сигналов будет не к какой-то структуре, у которой внутри лежит BufReader<TcpStream>, а к нему напрямую через реализацию трейта StreamReader.

Trait StreamReader (Файл server/src/reader.rs)
server/src/reader.rs
use std::{io::{BufReader, self, BufRead, Error, ErrorKind}, self, net::TcpStream};

use crate::types::SignalHeader;

pub trait StreamReader {
fn read_signal(&mut self) -> io::Result<String>;
}

impl StreamReader for BufReader<TcpStream> {
fn read_signal(&mut self) -> io::Result<String> {
let mut res_line = String::new();
let mut headers_read = false;
loop {
let mut buf_line = String::new();
match self.read_line(&mut buf_line) {
Err(_) => return Err(Error::new(ErrorKind::ConnectionAborted, "boom boom")),
Ok(0) => return Err(Error::new(ErrorKind::BrokenPipe, "boom boom")),
Ok(m) => m,
};
res_line.push_str(&buf_line);

if res_line.ends_with("\r\n\r\n"){
if !res_line.contains(&SignalHeader::WithMessage.to_string()) || headers_read {
break;
}
headers_read = true;
}
}

Ok(res_line)
}
}

Менеджер подключений (основная серверная логика)

Вся серверная логика будет существовать в рамках структуры Manager. Эта структура будет создаваться отдельно для каждого подключения и заниматься взаимодействием с ним.

Так как логика сама достаточно громоздкая, мы её поделим на два основных типа: работа с сокетом и работа с данными. Разделение это будет производиться за счёт разных trait’ов. То-есть по сути все методы будут привязаны к структуре Manager, но сигнатуры этих методов и реализации будут лежать в отдельных от Manager’а файлах.

Структура Manager (Файл server/src/managers/manager.rs)
server/src/managers/manager.rs
use std::{
net::TcpStream,
io::BufReader,
sync::Arc
};
use parking_lot::Mutex;
use anyhow::Result;

use crate::{state::State, messages_pool::MessagesPool};

pub struct Manager {
// Подключеённый сокет с клиентом
pub stream: TcpStream,

// Ридер, которые будет читать данные с сокета
pub reader: BufReader<TcpStream>,

// Состояние приложения
pub state: State,

// Общий пул сообщений
pub messages_pool: Arc<Mutex<MessagesPool>>,

// Последнее прочитанное из пула сообщение
pub last_read_message_id: String,

// Имя присоединённого юзера
pub connected_user_username: Option<String>,

// Адрес присоединённого юзера
pub connected_peer_addr: String
}

impl Manager {
pub fn new(stream: TcpStream, state: State, messages_pool: Arc<Mutex<MessagesPool>>) -> Result<()> {
let mut manager = Manager {
stream: stream.try_clone()?,
reader: BufReader::new(stream.try_clone()?),
state,
messages_pool,
last_read_message_id: String::new(),
connected_user_username: None,
connected_peer_addr: stream.try_clone()?.peer_addr()?.to_string()
};

Ok(())
}
}

Так как мы создали структуру Manager в отдельной папке, нам нужно определить эту папку как модуль, чтобы иметь возможность взаимодействовать с ней. Для этого в этой папке создайте файл mod.rs и выведите в публичный доступ структуру Manager.

Файл server/src/mangers/mod.rs
server/src/mangers/mod.rs
// Файл server/src/mangers/mod.rs
mod manager;

pub use manager::Manager;

Теперь нужно добавить модуль managers в список модулей в файле main.rs.

Обновление в файле server/src/main.rs
server/src/main.rs
// Файл server/src/main.rs

// ...
mod managers;
// ...

После того как структура Manager и папка для неё были созданы, можно начинать писать два основных trait’а, которые и будут содержать весь функционал менеджера, а именно: StreamManager (взаимодействие с сокетом) и DataManger (взаимодействие с данными).

Начнём с трейта DataManager, так как большая часть его методов потом будет использоваться в StreamManager’е. Итак, он будет содержать следующий функционал: определение стоит или не стоит пускать юзера на сервер, удаление юзера из списка авторизованных и вывод сообщения о его выходе из чата, отправка юзеру новых сообщений и обработка полученных от него.

Код DataManager (Файл server/src/managers/data_manager.rs)
server/src/managers/data_manager.rs
use std::sync::Arc;
use std::sync::mpsc::Receiver;
use std::thread;
use std::time::Duration;
use std::str::FromStr;
use anyhow::Result;
use parking_lot::Mutex;

// Библиотека для генерации уникальных идентификаторов.
use uuid::Uuid;

use crate::messages_pool::{PoolMessage, MessagesPool};
use crate::state::UserData;
use crate::types::{
AuthStatus,
SignalData,
SignalHeader,
AuthConnectionError,
IncomingMessageError,
SignalType
};

use super::manager::Manager;

// !! Будет описан ниже !!
use super::stream_manager::StreamManager;

pub trait DataManager {
// Отправляет клиенту сигнал о неудачной авторизации.
fn deny_auth(&mut self) -> Result<()>;

// Пытается авторизовать пользоватяля
// (проверяет есть ли юзер с таким ником на сервере или нет).
fn auth(&mut self, signal: String) -> Result<()>;

// Удаляет юзера из списка подключенных к серверу и
// отправялет сообщение о его выходе.
fn remove_user(&mut self, username: String) -> Result<()>;

// В цикле просматривает пул сообщений и отправляет новые пользователю.
fn process_messages_pool(&mut self, receiver: Receiver<()>) -> Result<()>;

// Проверяет сигнал нового сообщения и добавляет его в пул сообщений.
fn process_incoming_message(messages_pool: Arc<Mutex<MessagesPool>>, signal: String) -> Result<()>;
}

impl DataManager for Manager {
fn deny_auth(&mut self) -> Result<()> {
let response = SignalData::new(
vec![SignalHeader::AuthStatus(AuthStatus::DENIED)],
None
);

// !! Метод описан в StreamManager ниже !!
self.send_data(&response.to_string())?;
Ok(())
}

fn auth(&mut self, signal: String) -> Result<()> {
let data = SignalData::from_str(&signal)?;

match data.signal_type.unwrap() {
SignalType::Connection => {
if let None = data.username {
return Err(AuthConnectionError.into());
}
let mut state = self.state.get();
if state.users.contains_key(&data.username.clone().unwrap()) {
return Err(AuthConnectionError.into())
}
state.users.insert(data.username.clone().unwrap().to_owned(), UserData {
address: self.stream.peer_addr()?.to_string(),
});
self.messages_pool.lock().push(PoolMessage {
id: Uuid::new_v4().to_string(),
username: String::new(),
message: format!("{} joined the chat!", data.username.clone().unwrap()),
from_server: true
});
}
_ => return Err(AuthConnectionError.into()),
}

self.connected_user_username = Some(data.username.unwrap());

let response = SignalData::new(
vec![SignalHeader::AuthStatus(AuthStatus::ACCEPTED)],
None
);

// !! Метод описан в StreamManager ниже !!
self.send_data(&response.to_string())?;
Ok(())
}

fn remove_user(&mut self, username: String) -> Result<()> {
let mut state = self.state.get();

if state.users.contains_key(&username) {
state.users.remove(&username);
self.messages_pool.lock().push(PoolMessage {
id: Uuid::new_v4().to_string(),
username: String::new(),
message: format!("{username} left the chat!"),
from_server: true
});
}
Ok(())
}

fn process_messages_pool(&mut self, receiver: Receiver<()>) -> Result<()> {
loop {
// Проверяем наличие сообщения от потока, слушающего
// сообщения от пользователя. Если сообщение есть,
// значит соединение потеряно и можно обрывать цикл. Подробнее
// об этом в описании трейта StreamManager.
if let Ok(()) = receiver.try_recv() {
break;
};

// Так как self.messages_pool это Arc, его данные не получиться
// получить просто прописав self.messages_pool.clone().lock(),
// из-за того что клоннированная ссылка на пул сообщений удалится
// сразу из-за отсутстивия владельца. Поэтому надо задать ему владельца
// поместив в отдельную переменную lock_ref.
let lock_ref = self.messages_pool.clone();
let pool_lock = lock_ref.lock();

let messages = pool_lock.has_new(&self.last_read_message_id);
if let Some(v) = messages {
if let Some(last) = v.1 {
self.last_read_message_id = last;
}
for message in v.0 {
let mut syg_vec = vec![
SignalHeader::SignalType(SignalType::NewMessage),
SignalHeader::Username(message.username.clone()),
SignalHeader::WithMessage
];
if message.from_server {
syg_vec.push(SignalHeader::ServerMessage);
}
let response = SignalData::new(syg_vec, Some(&message.message));

// !! Метод описан в StreamManager ниже !!
self.send_data(&response.to_string())?;
}
}

// Ставим таймаут для цикла так, чтобы он работал быстро, но не слишком.
thread::sleep(Duration::from_millis(10));
}

Ok(())
}

// Простая проверка структуры сигнала на валидность сообщения.
fn process_incoming_message(messages_pool: Arc<Mutex<MessagesPool>>, signal: String) -> Result<()> {
let data = SignalData::from_str(&signal)?;

if !data.with_message || data.username.is_none() {
return Err(IncomingMessageError.into())
}

messages_pool.lock().push(PoolMessage {
id: Uuid::new_v4().to_string(),
username: data.username.clone().unwrap(),
message: data.message.clone().unwrap().trim().to_owned(),
from_server: false
});

Ok(())
}
}

Теперь трейт StreamManager. У него будет 4 основные задачи: обработка подключения, обработка отключения, передача данных юзеру и зацикленное чтение данных из сокета, то-есть сообщений приходящих от юзера.

Trait StreamManager (Файл server/src/managers/stream_manager.rs)
server/src/managers/stream_manager.rs
use std::{ 
io::{
Write, BufReader
},
time::Duration,
thread,
sync::{
mpsc::{
self,
Sender
}
}
};
use anyhow::Result;

use crate::{reader::StreamReader, types::SignalType};

// Импорт из super означает, что мы импортируем из
// того же пространства имён, в котором находимся.
use super::manager::Manager;

// Даём название новому трейтку и обозначаем, какие функции
// будут в него входить.
pub trait StreamManager {
// Вызывается в момент получения нового соединения, то-есть
// сразу после создания инстанса структуры Manager.
fn process_connection(&mut self) -> Result<()>;

// Вызывается, когда соединение с клиентом разрывается.
fn process_disconnection(&mut self) -> Result<()>;

// Отправляет данные в сокет, то-есть юзеру.
fn send_data(&mut self, data: &str) -> Result<()>;

// В цикле в отдельном потоке читает данные с сокета,
// которые посылает юзер.
fn process_signals(&mut self, sender: Sender<()>) -> Result<()>;
}

impl StreamManager for Manager {
fn process_connection(&mut self) -> Result<()> {
println!("Connection established - {}", self.connected_peer_addr);

// С помощью метода read_signal, который мы реализовали для
// BufReader'а с типом TcpStream, читаем данные, которые юзер
// отправляет для входа на сервер. Если произошла ошибка, то
// вызываем метод process_disconnection и прерываем выполнение функции.
let auth_data = match BufReader::new(
self.stream.try_clone()?
).read_signal() {
Ok(v) => v,
Err(_) => {
self.process_disconnection()?;
return Ok(())
}
};

// Проверяем данные введённые пользователем, в случае если они
// некорректные, то отсылаем пользователю сообщение с неудачным
// подключением и обрываем соединение.
if self.auth(auth_data.clone()).is_err() {
self.deny_auth()?;
self.process_disconnection()?;
return Ok(())
}

// Создаём канал для общения между потоками. Нужно для того чтобы
// можно было отправить receiver потоку (главному) сигнал о том, что
// соединение с пользователем потеряно.
let (channel_sender, channel_receiver) = mpsc::channel::<()>();

// Вызываем метода для получения сигналов от пользователя.
self.process_signals(channel_sender)?;

// Вызываем метод для мониторинга новых сообщений в пуле.
self.process_messages_pool(channel_receiver)?;

// Так как process_messages_pool будет цикличным, будет вызван
// в текущем потоке и его завершение будет означать конец соединения
// с пользователем, то после него необходимо вызвать process_disconnection.
self.process_disconnection()?;
Ok(())
}

fn process_disconnection(&mut self) -> Result<()> {
// Удаляем юзера из списка подключённых пользователей
// и выводим в чат сообщением о его уходе.
if self.connected_user_username.is_some() {
self.remove_user(self.connected_user_username.clone().unwrap())?;
}
println!("Connection closed - {}", self.connected_peer_addr);
Ok(())
}

fn send_data(&mut self, data: &str) -> Result<()> {
// Записываем в сокет через метод write строку в виде байтов.
self.stream.write(data.as_bytes())?;
Ok(())
}

fn process_signals(&mut self, sender: Sender<()>) -> Result<()> {
// Клонируем переменные для дальнейшей передачи их в
// новый поток.
let cloned_stream = self.stream.try_clone()?;
let cloned_messages_pool = self.messages_pool.clone();

// Создание нового потока. Так как переменные с сокетом и пулом
// мы по сути переносим в новый поток, нам нужно перед объявлением
// closure (анонимная функция) прописать слово move.
thread::spawn(move || -> Result<()> {
// Создаём новый reader и в цикле читаем из него сигналы.

let mut reader = BufReader::new(cloned_stream.try_clone()?);
loop {
let data_from_socket = match reader.read_signal() {
Ok(s) => s,
Err(_) => {
break;
}
};

// Метод process_incoming_message статический, поэтому вызываем
// его не у инстанса, а у типа напрямую через Self.
match Self::process_incoming_message(cloned_messages_pool.clone(), data_from_socket) {
Ok(_) => (),
Err(_) => println!("invalid message")
};
}

// Если цикл остановился, значит соединение оборвалось.
// Уведомляем основной поток менеджера об этом.
sender.send(())?;

Ok(())
});

Ok(())
}
}

После того, как у нас описаны два наших главных трейта для Manager’а, нужно вызвать метод process_connection в методе new.

Финальный Manager (Файл server/src/managers/manager.rs)
server/src/managers/manager.rs
use std::{
net::TcpStream,
io::BufReader,
sync::Arc
};
use parking_lot::Mutex;
use anyhow::Result;

use crate::{state::State, messages_pool::MessagesPool};
use super::stream_manager::StreamManager; // +

pub struct Manager {
pub stream: TcpStream,
pub reader: BufReader<TcpStream>,
pub state: State,
pub messages_pool: Arc<Mutex<MessagesPool>>,
pub last_read_message_id: String,
pub connected_user_username: Option<String>,
pub connected_peer_addr: String
}

impl Manager {
pub fn new(stream: TcpStream, state: State, messages_pool: Arc<Mutex<MessagesPool>>) -> Result<()> {
let mut manager = Manager {
stream: stream.try_clone()?,
reader: BufReader::new(stream.try_clone()?),
state,
messages_pool,
last_read_message_id: String::new(),
connected_user_username: None,
connected_peer_addr: stream.try_clone()?.peer_addr()?.to_string()
};

manager.process_connection()?; // +
Ok(())
}
}
Финальный файл модуля managers (Файл server/src/managers/mod.rs)
server/src/managers/mod.rs
// Файл server/src/managers/mod.rs
mod manager;
mod stream_manager; // +
mod data_manager; // +

pub use manager::Manager;

Осталось обновить наш метод run у структуры Service, так чтобы при каждом подключении создавался новый инстанс менеджера.

Финальный Service (Файл server/src/service.rs)
server/src/service.rs
use std::{net::TcpListener, thread, sync::Arc};
use anyhow::Result;
use parking_lot::Mutex;

use crate::{state::State, managers::Manager, messages_pool::MessagesPool}; // +

pub struct Service;

impl Service {
pub fn run(state: State) -> Result<()> {
let listener = TcpListener::bind(format!("0.0.0.0:{}", state.get().settings.port))?;

let messages_pool = Arc::new(Mutex::new(MessagesPool::new())); // +

for con in listener.incoming() {
let cloned_state = state.clone(); // +
let cloned_messages_pool = messages_pool.clone(); // +
thread::spawn(move || -> Result<()> {
Manager::new(con?, cloned_state, cloned_messages_pool)?; // +

Ok(())
});
}

Ok(())
}
}

Отправка и вывод сообщений на клиенте (основная клиентская логика)

Для того чтобы наш чат стал похож на чат необходимо реализовать считывание вводимых пользователем символов, отправку сообщений на сервер, а также вывод новых. Как оказалось это не так тривиально). Во время разработки этой части, в первую очередь я столкнулся со следующей проблемой: после ввода пользователем сообщения, и нажатия клавиши enter, результат его ввода выводится в терминал и засоряет чат, так как выводится он не в виде сообщения, а просто как строка, которую ввел юзер.

Решить эту проблему можно, переведя терминал в raw mode. В этом режиме программа будет сразу получать сигналы о нажатых кнопках, а так же выводиться в терминал пользователю ничего не будет. Мы будем обрабатывать сигналы о нажатых клавишах, и основываясь на них редактировать ввод пользователя, который хранится у нас в буфере. Собственно, для этого ранее мы и создали поле user_input в состоянии приложения.

Переводить в raw mode наш терминал, слушать нажимаемые пользователем клавиши, красить и стирать символы мы будем с помощью библиотеки termion. Всё это будет происходить в основном потоке приложения и описано в отдельном методе структуры Service.

Метод чтения вводимых юзером символов и отправки сообщений (Продолжение файла client/src/service.rs)
client/src/service.rs
use std::{
thread,
io::{
self,
Write
},
str::FromStr
};
use termion::{
raw::IntoRawMode,
input::TermRead
};
use crate::{
settings::Settings,
state::State,
connection::Connection,
types::{
SignalType,
SignalData,
SignalHeader
}
};

// ...

pub fn read_inputs(&mut self) {
// Перевод терминала в raw mode через метод полученный
// от ранее импортированного трейта IntoRawMode.
// Обязательно должно быть присвоено к переменной, даже если она не
// используется, так как в обратном случае он сразу после этой строки
// снимет raw mode. stdout в данном случае это тип RawTerminal<Stdout>,
// и для него отдельно реализован трейт Drop с методом drop,
// который выполняет сам Rust после того, как память выделенная под
// данные была освобождена. Конкретно в нашем случае метод drop возвращает
// терминал к прошлому состоянию.
let stdout = io::stdout().into_raw_mode().unwrap();


// С помощью метода keys, полученного от трейта TermRead, импортированного
// ранее, получаем итератор нажатых пользователем клавиш.
let mut stdin = io::stdin().keys();

loop {
// У всех итераторов есть метод next, которым можно получить
// следующую итерацию чего-либо.
let input = stdin.next();

if let Some(Ok(key)) = input {
// Определяем какая именно клавиша была нажата
match key {
// При нажатии на Ctrl+C, останавливаем цикл.
termion::event::Key::Ctrl('c') => break,

// При нажатии на Enter формируем и отправляем на сервер новое
// сообщение, при этом очищая user_input. Или в случае, если
// user_input это пустая строка, перерисовываем чат.
termion::event::Key::Char('\n') => {
let ms = self.state.user_input.lock().clone().trim().to_owned();
if ms == "" {
// Отправляет сигнал второму потоку, занимающемуся выводом
// сообщений и пользовательского ввода (он будет ниже),
// что чат нужно перерисовать. В случае, если получаем ошибку -
// останавливаем цикл.
match self.state.chat_reload_sender.send(()) {
Ok(_) => {},
Err(_) => break,
};

continue;
}
self.state.user_input.lock().clear();

// Формируем структуру сигнала.
let signal = SignalData::new(
vec![
SignalHeader::SignalType(SignalType::NewMessage),
SignalHeader::WithMessage,
SignalHeader::Username(self.state.username.to_owned())
],
Some(&ms)
);

// Переводим сигнал в строку, затем в байты и записываем
// в TcpStream, который является сокетом, соединённым с подключённым
// сокетом на сервере.
self.connection.stream.write_all(signal.to_string().as_bytes()).unwrap();
},

// При нажатии Backspace удаляем из инпута последний символ
// и отправляем сигнал на перерисовывание чата вместе с выводом.
termion::event::Key::Backspace => {
self.state.user_input.lock().pop();
match self.state.chat_reload_sender.send(()) {
Ok(_) => {},
Err(_) => continue,
};
}

// При нажатии клавиши, являющейся каким-то символом, добавляем его в
// user_input и отправляем сигнал на
// перерисовывание чата вместе с выводом.
termion::event::Key::Char(k) => {
println!("{k}");
self.state.user_input.lock().push_str(&k.to_string());
match self.state.chat_reload_sender.send(()) {
Ok(_) => {},
Err(_) => continue,
};
},

// Во всех остальных случаях переходим на следующую итерацию loop'а.
_ => {
continue;
}
}
}
}
}

// ...

После того, как мы научились отправлять полученные от клиента сообщения на сервер, нужно научиться принимать уже сообщения от сервера. Делается это не очень сложно. В отдельном потоке запускаем цикл, который будет слушать сигналы, полученные от сервера, и новые сообщения добавлять в вектор messages, который мы ранее добавляли в State нашего приложения.

Метод получения новых сообщений от сервера (Продолжение файла client/src/service.rs)
client/src/service.rs
// ...

pub fn proccess_incoming_messages(&self) {
let messages = self.state.messages.clone();
let tx = self.state.chat_reload_sender.clone();
let mut connection = self.connection.clone();

thread::spawn(move || -> io::Result<()> {
loop {
// Читаем сигнал из сокета
let data_from_socket = match connection.read_signal() {
Ok(v) => v,
Err(_) => break
};

let signal = SignalData::from_str(&data_from_socket);

// Ставим замок на сообщения, чтобы другие потоки
// не могли к нему прикасаться
let mut messages = messages.lock();

if let Ok(s) = signal {
if let Some(SignalType::NewMessage) = s.signal_type {
// Добавляем разные сообщения в вектор messages в зависимости
// от того, какого они типа. server_message означает, что
// это сообщение от сервера, то-есть о выходе/входе пользователя,
// и у него другой немного формат и стили.
if s.server_message {
messages.push(
// Добавление цветов и стилей сообщению
format!(
"{}{}{}{}",
termion::style::Faint,
termion::style::Bold,
s.message.unwrap(),
termion::style::Reset,
)
);
}
else {
messages.push(
format!(
"<{}> {}",
s.username.unwrap(),
s.message.unwrap()
)
);
}
}
}
// Отправляем сигнал на перерисовывание чата.
match tx.send(()) {
Ok(_) => {},
Err(_) => break
};
}

Ok(())
});
}

// ...

Остался сам метод, который выводит сообщения и ввод пользователя в терминал. Работать сам метод будет очень просто. Он в отдельном потоке в цикле будет ждать сигналы от других потоков для перерисовки чата и, соответственно, перерисовывать его в момент получения сигнала. Единственная тонкость заключается в том, что этому методу нужно будет передавать в новый поток receiver канала, который используется для передачи сигналов других потоков этому. Суть в том, что receiver нельзя скопировать, так как он может быть только один, поэтому перенос это единственный вариант. Но просто взять и перенести из структуры self его не получится, поэтому этот метод будет брать текущую структуру Service, перетаскивать из неё receiver в поток и возвращать новую структуру Service, но уже без reciever’а.

Метод включения вывода сообщений и текущего ввода пользователя в терминал (Продолжение файла client/src/service.rs)
client/src/service.rs
// ...

pub fn enable_print(self) -> Service {
let rx = self.state.chat_reload_receiver.unwrap();
let messages = self.state.messages.clone();
let user_input = self.state.user_input.clone();
let username = self.state.username.clone();

thread::spawn(move || -> io::Result<()> {
loop {
// Ждём сигналов от других потоков.
match rx.recv() {
Ok(()) => {},
Err(_) => break
};

// Очищаем полностью терминал.
print!("{}", termion::clear::All);

// Выводим все сообщения.
// lock поставит замок на сообщения, iter вернёт итератор,
// а enumerate вернёт итератор, у которого на каждой итерации
// дополнительно будет отдаваться и текущий номер итерации.
for (index, m) in messages.lock().iter().enumerate() {
if index == 0 {
print!("\r\n{m}\r\n");
}
else {
print!("{m}\r\n");
}
}

// Добавляем вывод строки с вводом пользователя.
let input = user_input.lock().clone();
print!(
"{}{}{} >{} {}",
termion::color::Bg(termion::color::White),
termion::color::Fg(termion::color::Black),
username,
termion::style::Reset,
input
);

// Отправляем все строки из буфера, помещённые туда через
// макрос print! в stdout.
std::io::stdout().flush()?;
}
Ok(())
});

// Возвращаем новую структуру.
Service {
connection: self.connection,
settings: self.settings,
state: State {
username: self.state.username.clone(),
chat_reload_receiver: None,
chat_reload_sender: self.state.chat_reload_sender.clone(),
user_input: self.state.user_input.clone(),
messages: self.state.messages.clone(),
}
}
}

// ...

Добавляем все написанные методы в методе run структуры Service и всё готово :)

Финальный метод запуска сервиса клиента (Продолжение файла client/src/service.rs)
client/src/service.rs
// ...

pub fn run(settings: Settings, state: State) -> io::Result<()> {
let connection = Connection::new(
&settings.server_address.to_owned(),
&state.username
)?;

let mut instance = Service {
connection,
settings,
state
}.enable_print(); // +

instance.proccess_incoming_messages(); // +
instance.read_inputs(); // +

Ok(())
}

// ...

Теперь вы можете запустить клиент командой (только со своим портом) cargo run -- -a 127.0.0.1:8080 и у вас откроется поле ввода никнейма, после которого вы перейдёте в чат вашего локального сервера.


Спасибо за внимание! :)

Ссылка на полный код проекта — https://github.com/IDSaves/terminal-chat Ссылка на клиент чата на сайте crates.io (быстрая установка для тех, у кого установлен Rust на компьютере) — https://crates.io/crates/tchat