Como faço para paralelizar um simples ciclo Python?
esta é provavelmente uma pergunta trivial, mas como faço para paralelizar o seguinte loop em python?
# setup output lists
output1 = list()
output2 = list()
output3 = list()
for j in range(0, 10):
# calc individual parameter value
parameter = j * offset
# call the calculation
out1, out2, out3 = calc_stuff(parameter = parameter)
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
eu sei como começar uma única linha em Python, mas não sei como "recolher" os resultados.
múltiplos processos também seriam bons - o que for mais fácil para este caso. Eu estou usando atualmente Linux, mas o código deve ser executado no Windows e Mac também-bem.
Qual é a maneira mais fácil de paralelizar este código?9 answers
Usar vários tópicos no CPython não lhe dará melhor desempenho para o código Python puro devido ao Global interpreter lock (GIL). Sugiro usar o multiprocessing
módulo em vez disso:
pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))
Note que isto não vai funcionar no interpretador interactivo.
Para evitar o FUD habitual em torno do GIL: não haveria qualquer vantagem em usar threads para este exemplo de qualquer maneira. Você quer usar processos aqui, não threads, porque eles evitam um monte de problema.
Para paralelizar um simples para o loop, joblib traz muito valor ao uso bruto de multiprocessamento. Não só a sintaxe curta, mas também coisas como o agrupamento transparente de iterações quando eles são muito rápidos (para remover a parte superior) ou captura do traceback do processo infantil, para ter melhor relatório de erro.
Disclaimer: eu sou o autor original de joblib.
Qual é a maneira mais fácil de paralelizar este código?
Eu realmente gosto de concurrent.futures
para isto, Disponível em Python3 desde a versão 3.2 - e via backport para 2.6 e 2.7 em PyPi .
Você pode usar threads ou processos e usar a mesma interface exata.
Multiprocessamento
Põe isto num ficheiro ... futuretest.py:import concurrent.futures
import time, random # add some random sleep time
offset = 2 # you don't supply these so
def calc_stuff(parameter=None): # these are examples.
sleep_time = random.choice([0, 1, 2, 3, 4, 5])
time.sleep(sleep_time)
return parameter / 2, sleep_time, parameter * parameter
def procedure(j): # just factoring out the
parameter = j * offset # procedure
# call the calculation
return calc_stuff(parameter=parameter)
def main():
output1 = list()
output2 = list()
output3 = list()
start = time.time() # let's see how long this takes
# we can swap out ProcessPoolExecutor for ThreadPoolExecutor
with concurrent.futures.ProcessPoolExecutor() as executor:
for out1, out2, out3 in executor.map(procedure, range(0, 10)):
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
finish = time.time()
# these kinds of format strings are only available on Python 3.6:
# time to upgrade!
print(f'original inputs: {repr(output1)}')
print(f'total time to execute {sum(output2)} = sum({repr(output2)})')
print(f'time saved by parallelizing: {sum(output2) - (finish-start)}')
print(f'returned in order given: {repr(output3)}')
if __name__ == '__main__':
main()
E aqui está a saída:
$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 33 = sum([0, 3, 3, 4, 3, 5, 1, 5, 5, 4])
time saved by parallellizing: 27.68999981880188
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]
Multithreading
Agora muda ProcessPoolExecutor
para ThreadPoolExecutor
, e corre o módulo novamente:
$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 19 = sum([0, 2, 3, 5, 2, 0, 0, 3, 3, 1])
time saved by parallellizing: 13.992000102996826
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]
Agora você fez tanto multithreading quanto multiprocessamento!
Nota sobre o desempenho e usar ambos em conjunto.
A amostragem é demasiado pequena para comparar os resultados.
No entanto, suspeito que o multithreading será mais rápido do que o multiprocessamento em geral, especialmente no Windows, uma vez que o Windows não suporta bifurcação, por isso cada novo processo tem de levar tempo para ser lançado. Em Linux ou Mac eles provavelmente estarão mais perto.Você pode nidificar múltiplos threads dentro de vários processos, mas é recomendado não usar vários threads para derivar vários processos.
import os
import re
import time
import sys
import thread
from threading import Thread
class thread_it(Thread):
def __init__ (self,param):
Thread.__init__(self)
self.param = param
def run(self):
mutex.acquire()
output.append(calc_stuff(self.param))
mutex.release()
threads = []
output = []
mutex = thread.allocate_lock()
for j in range(0, 10):
current = thread_it(j * offset)
threads.append(current)
current.start()
for t in threads:
t.join()
#here you have output list filled with data
Tenha em mente que será tão rápido quanto o seu fio mais lento.
Há uma série de vantagens em usar Ray:
- Você pode paralelizar sobre várias máquinas além de múltiplos núcleos (com o mesmo código).
- tratamento eficiente dos dados numéricos através da memória partilhada (e serialização de cópia nula).
- alta capacidade de processamento com programação distribuída. Tolerância à falha.
import ray
ray.init()
@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
# Do something.
return 1, 2, 3
E depois invocá-lo em paralelo
output1, output2, output3 = [], [], []
# Launch the tasks.
for j in range(10):
id1, id2, id3 = calc_stuff.remote(parameter=j)
output1.append(id1)
output2.append(id2)
output3.append(id3)
# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)
Para executar o mesmo exemplo em um aglomerado, a única linha que mudaria seria a chamada para ray.inicial(). A documentação relevante pode ser encontrada aqui .
Repare que estou a ajudar a desenvolver o Ray.
Isto pode ser útil na implementação de multiprocessamento e computação paralela / distribuída em Python.
Tutorial do YouTube sobre o uso do pacote techila
Techila é um middleware de computação distribuída, que se integra diretamente com Python usando o pacote techila. A função peach no pacote pode ser útil em estruturas paralelas de loop. (A seguir o excerto do código é dos fóruns comunitários de Techila)
techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
jobs = jobcount # Number of Jobs in the Project
)
from joblib import Parallel, delayed
import multiprocessing
inputs = range(10)
def processInput(i):
return i * i
num_cores = multiprocessing.cpu_count()
results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)
print(results)
O acima funciona lindamente na minha máquina (Ubuntu, joblib pacote foi pré-instalado, mas pode ser instalado via pip install joblib
).
Retirado de https://blog.dominodatalab.com/simple-parallelization/
Um exemplo muito simples de processamento paralelo é
from multiprocessing import Process
output1 = list()
output2 = list()
output3 = list()
def yourfunction():
for j in range(0, 10):
# calc individual parameter value
parameter = j * offset
# call the calculation
out1, out2, out3 = calc_stuff(parameter = parameter)
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
if __name__ == '__main__':
p = Process(target=pa.yourfunction, args=('bob',))
p.start()
p.join()
Http://docs.python.org/library/queue.html
Esta pode não ser a maneira certa de o fazer, mas eu faria algo do género:Código Actual;
from multiprocessing import Process, JoinableQueue as Queue
class CustomWorker(Process):
def __init__(self,workQueue, out1,out2,out3):
Process.__init__(self)
self.input=workQueue
self.out1=out1
self.out2=out2
self.out3=out3
def run(self):
while True:
try:
value = self.input.get()
#value modifier
temp1,temp2,temp3 = self.calc_stuff(value)
self.out1.put(temp1)
self.out2.put(temp2)
self.out3.put(temp3)
self.input.task_done()
except Queue.Empty:
return
#Catch things better here
def calc_stuff(self,param):
out1 = param * 2
out2 = param * 4
out3 = param * 8
return out1,out2,out3
def Main():
inputQueue = Queue()
for i in range(10):
inputQueue.put(i)
out1 = Queue()
out2 = Queue()
out3 = Queue()
processes = []
for x in range(2):
p = CustomWorker(inputQueue,out1,out2,out3)
p.daemon = True
p.start()
processes.append(p)
inputQueue.join()
while(not out1.empty()):
print out1.get()
print out2.get()
print out3.get()
if __name__ == '__main__':
Main()
Espero que isso ajude.