Создание пула потоков и сохранение в него потоков

Предостережения компилятора сообщают о том, что код не использует методы и параметры. Далее мы реализуем функционал.

Проверка количества потоков в пуле

Для начала рассмотрим метод new. Он получает целочисленное положительное значение. Обратите внимание, что пул с нулевым количеством потоков имеет смысл, т.к. 0 может иметь тип u32. Реализуем проверку значения параметра количества потоков перед возвращение экземпляра ThreadPool и используем макрос panic! если значение равно

  1. Для этого используем макрос assert!:

Filename: src/lib.rs


# #![allow(unused_variables)]
#fn main() {
# pub struct ThreadPool;
impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: u32) -> ThreadPool {
        assert!(size > 0);

        ThreadPool
    }

    // ...snip...
}
#}

код 20-13: реалазация функции ThreadPool::new, которая прервёт работу программы, если переменная size будет равна 0

Обратите внимание, что добавили информацию для генерации документации. Это хороший тон добавлять секции документации. Она будет весьма полезна, чтобы узнать почему же код не сработал (это мы обсуждали в главе 14). Запустим нашу программу с помощью команды cargo doc --open и посмотрим на созданную документацию.

Вместо добавления макроса assert! мы также могли бы использовать решение и примера 12-9. Если вы уверены в своих знаниях Rust API, реализуйте метод new такого вида:

fn new(size: u32) -> Result<ThreadPool, PoolCreationError> {

Сохранение потоков в пуле

После того, как вы проверили корректность входных данных мы можем приступить к созданию нужного количества потоков и сохранить их в экземпляре структуры ThreadPool перед тем как возвратить экземпляр.

Возникает вопрос, как же всё-таки сохранять потоки? Рассмотрим метод thread::spawn ещё раз:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T + Send + 'static,
        T: Send + 'static

spawn возвращает экземпляр JoinHandle<T>, где T является типом возвращаемого значения из замыкания. Попробуем использовать JoinHandle<T>. В нашем случае, замыкание, которое мы посылаем в пул потоков будет обрабатывать соединение и ничего не возвращать. Т.е. T будет действительно пока будет действителен пустой кортеж ().

Приведём идею решения, которое ещё пока не будет компилироваться (код 201-14). Мы изменим определение ThreadPool, чтобы он хранил вектор с объемом и размером, реализуем цикл for таким образом, чтобы о н создавал потоки и возвращал экземпляр ThreadPool:

Filename: src/lib.rs

use std::thread;

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    // ...snip...
    pub fn new(size: u32) -> ThreadPool {
        assert!(size > 0);

        let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // create some threads and store them in the vector
        }

        ThreadPool {
            threads
        }
    }

    // ...snip...
}

20-14: создание вектора для хранения потоков в ThreadPool

Мы добавили использование std::thread, т.к. нам понадобиться использовать thread::JoinHandle в качестве типа элементов в ThreadPool. Мы ещё не использовали функцию with_capacity. Она делает то же самое, что и Vec::new - изменяет размеры при вставке элементов. Поскольку мы создали вектор нужного размера, который нам нужен, никаких изменений размера не потребуется.

Давайте скомпилируем этот код и посмотрим на ошибку:

$ cargo check
   Compiling hello v0.1.0 (file:///projects/hello)
error[E0308]: mismatched types
  --> src\main.rs:70:46
   |
70 |         let mut threads = Vec::with_capacity(size);
   |                                              ^^^^ expected usize, found u32

error: aborting due to previous error

size имеет тип u32. Функции Vec::with_capacity нужен входной параметр типа usize. У нас есть две опции - мы можем изменить тип параметра функции или мы можем привести тип u32 к типу usize. Как вы помните, когда мы объявляли функцию new мы не задумывались о типе входных данных. Задумаемся сейчас. Тип данных usize имеет большое значение для вектора. Давайте изменим описание функции:

fn new(size: usize) -> ThreadPool {

Если вы запустите команду cargo check - код скомпилируется.

Как же всё-таки создавать потоки в цикле? Мы ещё не можем знать для чего они нам понадобятся - мы просто вносим замыкания Это сложный вопрос. Какие должен идти в этих потоках? Мы не знаем, какую работу они должны делать при этом поскольку метод execute берет замыкание и передаем его в пул.

Давим некоторых изменений. Вместо создания экземпляров JoinHandle<()>, создадим новую структуру, которая реализует концепцию Рабочий (Worker). Рабочий будет получать замыкание в методе execute и будет вызывать её. Дополнительно, нам даст это возможность иметь ограниченное количество экземпляров Рабочих и будет реализована абстракция.

Итак, реализуем сттукру. Описание изменений:

  1. Создадим структуры Worker, полями которой будут id и JoinHandle<()>
  2. Сделаем так, чтобы вектор в ThreadPool содержал бы экземпляры Worker
  3. Опишем функцию Worker::new, чтобы она получала бы id и возвращала бы экземпляр Worker с id и поток с пустым замыканием
  4. В ThreadPool::new, будем использовать цикл for и его счётчик будет id. Будет создаваться экземпляр Worker с этим id и этот вектор будет сохраняться в вектор.

Если вы чувствуете в себе силы, пожалуйста, реализуйте свое решение этой задачи, а потом посмотрите на решение 20-15.

Готовы? Вот код 20-15, который реализует with one way to make these modifications:

Filename: src/lib.rs


# #![allow(unused_variables)]
#fn main() {
use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    // ...snip...
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool {
            workers
        }
    }
    // ...snip...
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker {
            id,
            thread,
        }
    }
}
#}

код 20-15: изменения структуры ThreadPool для хранения экземпляров Worker вместо хранение непосредственно потоков

Мы решили изменить имя поля с threads на workers, т.к. мы изменили тип данных поля. Мы используем счётчик в цикле.

Этот код компилируется и сохраняет экземпляры структур. Пока мы никак не обрабатываем, т.е. не делаем никаких действий, не предоставляем никаких действий над потоками. Об этом мы поговорим в следующий секции.