Como faço para paralelizar um simples ciclo Python?

esta é provavelmente uma pergunta trivial, mas como faço para paralelizar o seguinte loop em python?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

eu sei como começar uma única linha em Python, mas não sei como "recolher" os resultados.

múltiplos processos também seriam bons - o que for mais fácil para este caso. Eu estou usando atualmente Linux, mas o código deve ser executado no Windows e Mac também-bem.

Qual é a maneira mais fácil de paralelizar este código?

Author: Aaron Hall, 2012-03-20

9 answers

Usar vários tópicos no CPython não lhe dará melhor desempenho para o código Python puro devido ao Global interpreter lock (GIL). Sugiro usar o multiprocessing módulo em vez disso:

pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))
Note que isto não vai funcionar no interpretador interactivo.

Para evitar o FUD habitual em torno do GIL: não haveria qualquer vantagem em usar threads para este exemplo de qualquer maneira. Você quer usar processos aqui, não threads, porque eles evitam um monte de problema.

 113
Author: Sven Marnach, 2015-06-17 17:14:32

Para paralelizar um simples para o loop, joblib traz muito valor ao uso bruto de multiprocessamento. Não só a sintaxe curta, mas também coisas como o agrupamento transparente de iterações quando eles são muito rápidos (para remover a parte superior) ou captura do traceback do processo infantil, para ter melhor relatório de erro.

Disclaimer: eu sou o autor original de joblib.

 33
Author: Gael Varoquaux, 2018-08-08 19:01:03
Qual é a maneira mais fácil de paralelizar este código?

Eu realmente gosto de concurrent.futures para isto, Disponível em Python3 desde a versão 3.2 - e via backport para 2.6 e 2.7 em PyPi .

Você pode usar threads ou processos e usar a mesma interface exata.

Multiprocessamento

Põe isto num ficheiro ... futuretest.py:
import concurrent.futures
import time, random               # add some random sleep time

offset = 2                        # you don't supply these so
def calc_stuff(parameter=None):   # these are examples.
    sleep_time = random.choice([0, 1, 2, 3, 4, 5])
    time.sleep(sleep_time)
    return parameter / 2, sleep_time, parameter * parameter

def procedure(j):                 # just factoring out the
    parameter = j * offset        # procedure
    # call the calculation
    return calc_stuff(parameter=parameter)

def main():
    output1 = list()
    output2 = list()
    output3 = list()
    start = time.time()           # let's see how long this takes

    # we can swap out ProcessPoolExecutor for ThreadPoolExecutor
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for out1, out2, out3 in executor.map(procedure, range(0, 10)):
            # put results into correct output list
            output1.append(out1)
            output2.append(out2)
            output3.append(out3)
    finish = time.time()
    # these kinds of format strings are only available on Python 3.6:
    # time to upgrade!
    print(f'original inputs: {repr(output1)}')
    print(f'total time to execute {sum(output2)} = sum({repr(output2)})')
    print(f'time saved by parallelizing: {sum(output2) - (finish-start)}')
    print(f'returned in order given: {repr(output3)}')

if __name__ == '__main__':
    main()

E aqui está a saída:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 33 = sum([0, 3, 3, 4, 3, 5, 1, 5, 5, 4])
time saved by parallellizing: 27.68999981880188
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]

Multithreading

Agora muda ProcessPoolExecutor para ThreadPoolExecutor, e corre o módulo novamente:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 19 = sum([0, 2, 3, 5, 2, 0, 0, 3, 3, 1])
time saved by parallellizing: 13.992000102996826
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]
Agora você fez tanto multithreading quanto multiprocessamento!

Nota sobre o desempenho e usar ambos em conjunto.

A amostragem é demasiado pequena para comparar os resultados.

No entanto, suspeito que o multithreading será mais rápido do que o multiprocessamento em geral, especialmente no Windows, uma vez que o Windows não suporta bifurcação, por isso cada novo processo tem de levar tempo para ser lançado. Em Linux ou Mac eles provavelmente estarão mais perto.

Você pode nidificar múltiplos threads dentro de vários processos, mas é recomendado não usar vários threads para derivar vários processos.

 20
Author: Aaron Hall, 2017-05-05 16:28:38
Porque não usas threads e um mutex para proteger uma lista global?
import os
import re
import time
import sys
import thread

from threading import Thread

class thread_it(Thread):
    def __init__ (self,param):
        Thread.__init__(self)
        self.param = param
    def run(self):
        mutex.acquire()
        output.append(calc_stuff(self.param))
        mutex.release()   


threads = []
output = []
mutex = thread.allocate_lock()

for j in range(0, 10):
    current = thread_it(j * offset)
    threads.append(current)
    current.start()

for t in threads:
    t.join()

#here you have output list filled with data
Tenha em mente que será tão rápido quanto o seu fio mais lento.
 3
Author: jackdoe, 2012-03-20 11:54:59

Há uma série de vantagens em usar Ray:

  • Você pode paralelizar sobre várias máquinas além de múltiplos núcleos (com o mesmo código).
  • tratamento eficiente dos dados numéricos através da memória partilhada (e serialização de cópia nula).
  • alta capacidade de processamento com programação distribuída.
  • Tolerância à falha.
No seu caso, pode iniciar o Ray e definir uma função remota.
import ray

ray.init()

@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
    # Do something.
    return 1, 2, 3
E depois invocá-lo em paralelo
output1, output2, output3 = [], [], []

# Launch the tasks.
for j in range(10):
    id1, id2, id3 = calc_stuff.remote(parameter=j)
    output1.append(id1)
    output2.append(id2)
    output3.append(id3)

# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)
Para executar o mesmo exemplo em um aglomerado, a única linha que mudaria seria a chamada para ray.inicial(). A documentação relevante pode ser encontrada aqui . Repare que estou a ajudar a desenvolver o Ray.
 2
Author: Robert Nishihara, 2018-08-30 06:49:54

Isto pode ser útil na implementação de multiprocessamento e computação paralela / distribuída em Python.

Tutorial do YouTube sobre o uso do pacote techila

Techila é um middleware de computação distribuída, que se integra diretamente com Python usando o pacote techila. A função peach no pacote pode ser útil em estruturas paralelas de loop. (A seguir o excerto do código é dos fóruns comunitários de Techila)

techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
    files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
    jobs = jobcount # Number of Jobs in the Project
    )
 1
Author: TEe, 2015-10-22 11:30:45
from joblib import Parallel, delayed
import multiprocessing

inputs = range(10) 
def processInput(i):
    return i * i

num_cores = multiprocessing.cpu_count()

results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)
print(results)

O acima funciona lindamente na minha máquina (Ubuntu, joblib pacote foi pré-instalado, mas pode ser instalado via pip install joblib).

Retirado de https://blog.dominodatalab.com/simple-parallelization/

 1
Author: tyrex, 2018-06-19 10:31:00

Um exemplo muito simples de processamento paralelo é

from multiprocessing import Process
output1 = list()
output2 = list()
output3 = list()

def yourfunction():

    for j in range(0, 10):
        # calc individual parameter value
        parameter = j * offset
        # call the calculation
        out1, out2, out3 = calc_stuff(parameter = parameter)

        # put results into correct output list
        output1.append(out1)
        output2.append(out2)
        output3.append(out3)
if __name__ == '__main__':
p = Process(target=pa.yourfunction, args=('bob',))
p.start()
p.join()
 0
Author: Adil Warsi, 2018-05-10 06:56:17
Olha para isto.

Http://docs.python.org/library/queue.html

Esta pode não ser a maneira certa de o fazer, mas eu faria algo do género:

Código Actual;

from multiprocessing import Process, JoinableQueue as Queue 

class CustomWorker(Process):
    def __init__(self,workQueue, out1,out2,out3):
        Process.__init__(self)
        self.input=workQueue
        self.out1=out1
        self.out2=out2
        self.out3=out3
    def run(self):
            while True:
                try:
                    value = self.input.get()
                    #value modifier
                    temp1,temp2,temp3 = self.calc_stuff(value)
                    self.out1.put(temp1)
                    self.out2.put(temp2)
                    self.out3.put(temp3)
                    self.input.task_done()
                except Queue.Empty:
                    return
                   #Catch things better here
    def calc_stuff(self,param):
        out1 = param * 2
        out2 = param * 4
        out3 = param * 8
        return out1,out2,out3
def Main():
    inputQueue = Queue()
    for i in range(10):
        inputQueue.put(i)
    out1 = Queue()
    out2 = Queue()
    out3 = Queue()
    processes = []
    for x in range(2):
          p = CustomWorker(inputQueue,out1,out2,out3)
          p.daemon = True
          p.start()
          processes.append(p)
    inputQueue.join()
    while(not out1.empty()):
        print out1.get()
        print out2.get()
        print out3.get()
if __name__ == '__main__':
    Main()
Espero que isso ajude.
 -1
Author: MerreM, 2012-03-20 12:16:20