Article écrit par Hugo Fabre
Pouvoir exécuter plusieurs tâches en parallèle, que ce soit dans un script ou une application, peut être vraiment très utile, surtout dans le cas où le traitement de ces tâches peut être très long. Pour pallier ce problème, la plupart des langages de programmation nous offrent plusieurs solutions. Dans notre cas, nous utiliserons des threads. Pour ceux qui ne sont pas familiers avec l’utilisation des threads en Ruby, je vous invite à regarder cette vidéo qui traite du sujet.
Pourquoi une thread pool ?
Pourquoi utiliser une thread pool alors qu’on pourrait juste générer un nouveau thread pour chaque tâche ?
Effectivement, on pourrait se contenter de lancer un thread par tâche et d’attendre le résultat. Le souci avec cette approche c’est que si vous avez beaucoup de tâches à effectuer, vous allez surcharger votre machine qui a de grandes chances de planter.
C’est parti !
Avant toute chose, si vous rencontrez des can't alloc thread (ThreadError)
en utilisant Ruby 2.5.0, je vous conseille d’utiliser la version 2.4.2, 2.5.3, ou 2.6.0. Je suppose que le problème est lié à ce bug
Les tâches à effectuer
Nous allons d’abord écrire une classe qui représentera les différentes tâches à exécuter via nos threads.
class LongTask def initialize(value) @value = value end def run sleep(10) print "#{@value}n" end end
C’est une tâche très simple, mais grâce à l’utilisation de sleep(10)
, on s’assure qu’elle prendra au moins 10 secondes à s’exécuter ce qui nous permettra de mieux visualiser le fonctionnement de notre thread pool.
Pour tester, vous pouvez déjà lancer quelques tâches :
En synchrone :
5.times do |i| LongTask.new(i).run end
En asynchrone :
tasks = [] threads = [] 5.times do |i| tasks << LongTask.new(i) end tasks.each do |task| threads << Thread.new do task.run end end threads.each(&:join)
Comme vous avez pu le remarquer si vous avez testé les deux façons de faire présentées ci-dessus, lorsqu’on lance les tâches dans des threads on ne sait pas dans quel ordre elles seront exécutées, il faudra être conscient de ça dans l’utilisation de notre thread pool.
La thread pool
Pour notre thread pool nous aurons besoin d’un tableau pour stocker les threads en cours d’exécution, d’une file pour stocker les tâches en attentes et enfin d’une taille maximum, qui nous permettra justement d’éviter les problèmes qu’on pourrait causer en générant trop de threads en même temps. Je vous propose une implémentation très simple pour commencer :
class ThreadPool def initialize(size: 10) @size = size @tasks = Queue.new @pool = [] end def schedule(*args, &block) @tasks << [block, args] end end
Effectivement c’est simple, mais comment on l’utilise ?
Nous allons avoir besoin d’une méthode pour démarrer notre thread pool. Mais avant de la coder, il faut savoir ce qu’on va y faire. Donc l’objectif de notre classe sera d’attendre en continu la présence d’une ou plusieurs tâches dans la file puis de les dépiler pour les exécuter une par une dans des threads séparés dans la limite du nombre de threads disponibles™. Chaque thread aura la charge de s’enlever de notre thread pool une fois son traitement terminé.
Facile !
class ThreadPool # ... def start Thread.new do loop do next if @pool.size >= @size task, args = @tasks.pop thread = Thread.new do task.call(*args) end_thread(thread) end @pool << thread end end end def end_thread(thread) @pool.delete(thread) thread.kill end # ... end
Attention, à noter l’encapsulation du travail de la méthode dans un thread. Si on ne le fait pas notre méthode devient bloquante et impossible d’y ajouter des tâches après l’avoir lancée. Et si on testait pour voir ?
thread_pool = ThreadPool.new thread_pool.start 15.times do |i| thread_pool.schedule do LongTask.new(i).run end end
Eh bien ? Il ne se passe rien…
Effectivement il ne se passe pas grand-chose, mais pourquoi ? Ici le souci c’est que l’on démarre notre thread pool en asynchrone qui attend d’avoir des tâches pour les effectuer. Le problème c’est que comme elle tourne en asynchrone, notre programme se termine avant qu’elle ait fini de jouer toutes ses tâches.
Ici c’est en fait un faux problème. Selon l’utilisation que vous aurez de votre thread pool on pourrait très bien s’arrêter ici. Par exemple, imaginons que j’aie une application qui écoute via la Gem listen un dossier donné, avec pour rôle de déplacer les fichiers entrants en suivant des règles spécifiques (dans des dossiers différents selon leurs noms par exemple). Cette application sera destinée à tourner sans s’arrêter et du coup le problème n’existe pas, puisqu’on ne terminera jamais volontairement l’application.
Mais pour rendre les choses plus intéressantes, je vous propose de moderniser un peu notre thread pool.
Aller plus loin
Attendre que la thread pool n’ait plus de tâches à effectuer pour s’arrêter
Très simple, il suffit de rajouter une méthode pour savoir si notre thread pool est active :
class ThreadPool # ... def inactive ? @tasks.empty ? && @pool.empty ? end # ... end
Et ensuite, on vérifie régulièrement pour savoir si notre thread pool est toujours active, notre code de test devient donc :
thread_pool = ThreadPool.new thread_pool.start 15.times do |i| thread_pool.schedule do LongTask.new(i).run end end sleep(1) until thread_pool.inactive ?
Gestion d’erreur
Avec notre implémentation, il n’est pas trop compliqué de gérer et reporter les erreurs qui se produiraient lors du traitement d’une tâche. Pour cela nous allons encapsuler chaque lancement de tâche et son thread associé dans un objet spécifique Processor
. Cela nous permettra d’identifier chaque tâche qui a échoué et pourquoi. Ce Processor
aura connaissance de la tâche à effectuer, les arguments à lui passer, son status (réussi ou raté), le thread dans lequel la tâche a été lancé, les éventuels messages d’erreur et enfin un manager
qui est en fait notre thread pool. Nous en avons besoin pour pouvoir lui signifier qu’un traitement est fini et le prévenir en cas d’erreur.
class Processor attr_reader :thread def initialize(task, args, manager) @task = task @args = args @manager = manager @success = false @thread = nil @error = nil end def run @thread = Thread.new do begin @task.call(*args) @success = true rescue => e @success = false @error = e.message @manager.add_failed_processor(self) end @manager.end_processor(self) end end def success ? @success end def fail ? !success ? end end
Ensuite on va devoir changer quelques petites choses dans notre thread pool. D’abord, on va changer la méthode end_thread
qui deviendra end_processor
vu que l’on gère maintenant des processeurs qui encapsulent nos threads et rajouter une méthode pour stocker les processeurs qui ont raté
class ThreadPool # ... def end_processor(processor) @pool.delete(processor) processor.thread.kill end def add_failed_processor(processor) @failed_processors << processor end # ... end
On pense à rajouter notre nouvelle variable d’instance @failed_processors = []
et ensuite on change légèrement notre méthode start
de la thread pool :
class ThreadPool # ... def start @waiting_thread = Thread.new do loop do next if @pool.size >= @size task, args = @tasks.pop processor = Processor.new(task, args, self) processor.run @pool << processor end end end # ... end
Nous sommes fin prêts pour lancer tout ça et avoir un petit reporting à la fin. Il faudra quand même rajouter un attr_reader :failed_processors
à notre thread pool.
thread_pool = ThreadPool.new thread_pool.start 15.times do |i| thread_pool.schedule do LongTask.new(i).run end end sleep(1) until thread_pool.inactive ? puts "Failed tasks : #{thread_pool.failed_processors}" # Results => # ... # 12 # 13 # 14 # Failed tasks : []
C’est fini !
Évidemment il faut garder à l’esprit que c’est un choix d’implémentation parmi tant d’autres, avec ses qualités et ses défauts. J’ai choisi cette approche parce qu’elle me permet d’avoir quelque chose de simple et de très facilement modulable, par exemple j’ai pu rajouter un système de Timeout
sur mes processeurs ainsi qu’un système de retry limité (4 retries maximum par exemple).
Pour voir une autre implémentation très intéressante, je vous invite à lire cet article dont je me suis pas mal inspiré (en anglais).