Multiprocesamiento en Python: Threads a fondo, enumeración, herencia y temporizadores

En el anterior artículo de la serie nos introdujimos más a fondo en el manejo de hilos en Python. En esta nueva entrega de multiprocesamiento en Python vamos a ver los threads aún más a fondo aprendiendo a enumerar todos los threads, heredar del objeto Thread y usar threads con temporizador.

En cada nuevo artículo de la serie iremos profundizando cada vez más en los entresijos del multiprocesamiento en Python y pondremos nuestros conocimientos en práctica escribiendo una serie de benchmarks y tests para comprobar que nuestras aplicaciones se comportan como esperamos de ellas.

Enumerar los hilos

Como vimos en el artículo anterior, es posible crear hilos como daemons y debemos utilizar el método join() de forma explícita para esperar a que uno de ellos finalice.

Realmente no es necesario mantener un manejador explícito a cada hilo daemonizado para asegurarnos de que hayan completado su tarea antes de salir del proceso. Podemos utilizar el método enumerate() que devuelve una lista de todos las instancias de hilos activos.

Hay que tener cuidado porque la lista incluye el hilo principal (main thread) y hacer un join() al hilo principal produce de forma irremisible un deadlock (¿recuerdas lo que es un deadlock de la serie de multiprocesamiento en C++?. En caso contrario revisa este post).

Para nuestros ejemplos de hoy vamos a utilizar la misma configuración para el módulo logging que usamos en el artículo anterior así que voy a obviarla del código de los ejemplos, si necesitas revisarla comprueba el anterior post de la serie:

import random
import threading
import time
import logging
def worker():
    ct = threading.currentThread()
    p = random.randint(1, 5)
    logging.debug('durmiendo %s', p)
    time.sleep(p)
    logging.debug('despertando y saliendo')
    return
for i in range(4):
    tr = threading.Thread(target=worker)
    tr.setDaemon(True)
    tr.start()
# hilo principal
mt = threading.currentThread()
for th in threading.enumerate():
    # si es el hilo principal saltar o entraremos en deadlock
    if th is mt:
        continue
    logging.debug('haciendo join a %s', th.getName())
    th.join()
La salida del código anterior puede variar entre ejecuciones debido a que el tiempo de espera es aleatorio:

python enumera_hilos.py
[DEBUG] – Thread-1   : durmiendo 5
[DEBUG] – Thread-2   : durmiendo 2
[DEBUG] – Thread-3   : durmiendo 1
[DEBUG] – Thread-4   : durmiendo 5
[DEBUG] – MainThread : haciendo join a Thread-1
[DEBUG] – Thread-3   : despertando y saliendo
[DEBUG] – Thread-2   : despertando y saliendo
[DEBUG] – Thread-1   : despertando y saliendo
[DEBUG] – MainThread : haciendo join a Thread-4
[DEBUG] – Thread-4   : despertando y saliendo
[DEBUG] – MainThread : haciendo join a Thread-3
[DEBUG] – MainThread : haciendo join a Thread-2

Derivando la clase Thread

Todo hilo debe realizar algunas operaciones básicas de inicialización y entonces llamar a su método run run() que llama a la función objetivo pasada en el constructor. Para crear una clase derivada de Thread es necesario sobreescribir el método run():
import threading
import logging
class GenThread(threading.Thread):
    def run(self):
        logging.debug('en ejecución')
        return
for i in range(10):
    gth = GenThread()
    gth.start()
La salida del código anterior sería:
python derivada.py
[DEBUG] – Thread-1   : en ejecución
[DEBUG] – Thread-2   : en ejecución
[DEBUG] – Thread-3   : en ejecución
[DEBUG] – Thread-4   : en ejecución
[DEBUG] – Thread-5   : en ejecución
[DEBUG] – Thread-6   : en ejecución
[DEBUG] – Thread-7   : en ejecución
[DEBUG] – Thread-8   : en ejecución
[DEBUG] – Thread-9   : en ejecución
[DEBUG] – Thread-10  : en ejecución
La implementación del objeto Thread y su constructor no permite acceder a los parámetros pasados al constructor de forma sencilla desde una subclase. Si queremos pasar argumentos a nuestra clase, debemos redefinir el constructor y guardar los valores en una instancia visible para la subclase:
import threading
import logging
class GenThread(threading.Thread):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, verbose=None):
        threading.Thread.__init__(self, group=group, target=target, name=name, verbose=verbose)
        self.args = args
        self.kwargs = kwargs
        return
    def run(self):
        logging.debug('en ejecución con parámetros %s y %s', self.args, self.kwargs)
        return
for i in range(10):
    gth = GenThread(args=(i,), kwargs={'g' : 'G', 'e' : 'E'})
    gth.start()
La salida del código anterior no es mucho más especial que la anterior:
[DEBUG] – Thread-1   : en ejecución con parámetros (0,) y {'e': 'E', 'g': 'G'}
[DEBUG] – Thread-2   : en ejecución con parámetros (1,) y {'e': 'E', 'g': 'G'}
[DEBUG] – Thread-3   : en ejecución con parámetros (2,) y {'e': 'E', 'g': 'G'}
[DEBUG] – Thread-4   : en ejecución con parámetros (3,) y {'e': 'E', 'g': 'G'}
[DEBUG] – Thread-5   : en ejecución con parámetros (4,) y {'e': 'E', 'g': 'G'}
[DEBUG] – Thread-6   : en ejecución con parámetros (5,) y {'e': 'E', 'g': 'G'}
[DEBUG] – Thread-7   : en ejecución con parámetros (6,) y {'e': 'E', 'g': 'G'}
[DEBUG] – Thread-8   : en ejecución con parámetros (7,) y {'e': 'E', 'g': 'G'}
[DEBUG] – Thread-9   : en ejecución con parámetros (8,) y {'e': 'E', 'g': 'G'}
[DEBUG] – Thread-10  : en ejecución con parámetros (9,) y {'e': 'E', 'g': 'G'}
Esta subclase es muy sencilla y comparte la API con su clase base Thread pero se puede crear una clase todo lo compleja que queramos para que cumpla con nuestra necesidades.

Threads con temporizador

Una de las razones para derivar una clase del objeto Thread queda perfectamente retratada en la clase Timer que también está incluido en el módulo threading y que nos provee de una manera de lanzar nuestros hilos tras un retraso que además puede ser cancelado en cualquier punto de ese retraso:
import threading
import time
import logging
def retraso():
    logging.debug('worker en ejecución')
    return
th1 = threading.Timer(2, restraso)
th1.setName('th1')
th2 = threading.Timer(2, restraso)
th2.setName('th2')
logging.debug('lanzando temporizadores')
th1.start()
th2.start()
logging.debug('esperando antes de cancelar a %s', th2.getName())
time.sleep(1)
logging.debug('cancelando a %s', th2.getName())
th2.cancel()
logging.debug('hecho')
En el ejemplo anterior el segundo Timer no debería ejecutarse nunca y el primero se ejecuta de forma aparente después del resto del programa. Al no ser un hilo daemon hace join de forma implícita cuando el MainThread termina.

python timer.py
[DEBUG] – MainThread : lanzando temporizadores
[DEBUG] – MainThread : esperando antes de cancelar a th2
[DEBUG] – MainThread : cancelando a th2
[DEBUG] – MainThread : hecho
[DEBUG] – th1        : worker en ejecución

En el próximo artículo de la serie seguiremos indagando cada vez más profúndamente en el multiprocesamiento tocando temas como la comunicación entre hilos y el acceso a los recursos.

En Genbeta Dev | Multiprocesamiento en Python

Portada de Genbeta