calcular a média de movimento exponencial em python
então, dado o seguinte código, como poderia calcular a média ponderada móvel de pontos de QI para o calendário encontros?
from datetime import date
days = [date(2008,1,1), date(2008,1,2), date(2008,1,7)]
IQ = [110, 105, 90]
(provavelmente há uma maneira melhor de estruturar os dados, qualquer conselho seria apreciado)
11 answers
Editar:
Parece que mov_average_expw()
função de scikits.timeseries.dado.moving_functs submódulo de SciKits (conjuntos de ferramentas add-on que complementam SciPy) melhor se adequa ao texto da sua pergunta.
Para calcular umsuavização exponencial dos seus dados com um factor de suavização alpha
(é (1 - alpha)
nos termos da Wikipédia):
>>> alpha = 0.5
>>> assert 0 < alpha <= 1.0
>>> av = sum(alpha**n.days * iq
... for n, iq in map(lambda (day, iq), today=max(days): (today-day, iq),
... sorted(zip(days, IQ), key=lambda p: p[0], reverse=True)))
95.0
[7]o acima não é bonito, então vamos refacturá-lo um bit:
from collections import namedtuple
from operator import itemgetter
def smooth(iq_data, alpha=1, today=None):
"""Perform exponential smoothing with factor `alpha`.
Time period is a day.
Each time period the value of `iq` drops `alpha` times.
The most recent data is the most valuable one.
"""
assert 0 < alpha <= 1
if alpha == 1: # no smoothing
return sum(map(itemgetter(1), iq_data))
if today is None:
today = max(map(itemgetter(0), iq_data))
return sum(alpha**((today - date).days) * iq for date, iq in iq_data)
IQData = namedtuple("IQData", "date iq")
if __name__ == "__main__":
from datetime import date
days = [date(2008,1,1), date(2008,1,2), date(2008,1,7)]
IQ = [110, 105, 90]
iqdata = list(map(IQData, days, IQ))
print("\n".join(map(str, iqdata)))
print(smooth(iqdata, alpha=0.5))
Exemplo:
$ python26 smooth.py
IQData(date=datetime.date(2008, 1, 1), iq=110)
IQData(date=datetime.date(2008, 1, 2), iq=105)
IQData(date=datetime.date(2008, 1, 7), iq=90)
95.0
def ema(s, n):
"""
returns an n period exponential moving average for
the time series s
s is a list ordered from oldest (index 0) to most
recent (index -1)
n is an integer
returns a numeric array of the exponential
moving average
"""
s = array(s)
ema = []
j = 1
#get n sma first and calculate the next n period ema
sma = sum(s[:n]) / n
multiplier = 2 / float(1 + n)
ema.append(sma)
#EMA(current) = ( (Price(current) - EMA(prev) ) x Multiplier) + EMA(prev)
ema.append(( (s[n] - sma) * multiplier) + sma)
#now calculate the rest of the values
for i in s[n+1:]:
tmp = ( (i - ema[j]) * multiplier) + ema[j]
j = j + 1
ema.append(tmp)
return ema
Aqui está um exemplo de como fazê-lo:
import pandas as pd
import numpy as np
def ema(values, period):
values = np.array(values)
return pd.ewma(values, span=period)[-1]
values = [9, 5, 10, 16, 5]
period = 5
print ema(values, period)
Mais informações sobre Pandas EWMA:
Http://pandas.pydata.org/pandas-docs/stable/generated/pandas.ewma.html
A minha python está um pouco enferrujada (qualquer um pode se sentir livre para editar este código para fazer correções, se eu estraguei a sintaxe de alguma forma), mas aqui vai....
def movingAverageExponential(values, alpha, epsilon = 0):
if not 0 < alpha < 1:
raise ValueError("out of range, alpha='%s'" % alpha)
if not 0 <= epsilon < alpha:
raise ValueError("out of range, epsilon='%s'" % epsilon)
result = [None] * len(values)
for i in range(len(result)):
currentWeight = 1.0
numerator = 0
denominator = 0
for value in values[i::-1]:
numerator += value * currentWeight
denominator += currentWeight
currentWeight *= alpha
if currentWeight < epsilon:
break
result[i] = numerator / denominator
return result
Esta função move-se para trás, do fim da lista ao início, calculando a média exponencial móvel para cada valor trabalhando para trás até que o coeficiente de peso para um elemento seja inferior ao epsilon dado.
No final da função, inverte os valores antes de devolver a lista (so que eles estão na ordem correta para o chamador).
(nota lateral: se eu estivesse usando uma linguagem diferente do python, eu criaria um array vazio de tamanho completo primeiro e então preencheria a ordem invertida, de modo que eu não teria que revertê-la no final. Mas acho que não se pode declarar uma grande matriz vazia em python. E nas listas python, adicionar é muito menos caro do que pré-adicionar, e é por isso que eu construí a lista em ordem inversa. Por favor, corrija-me se estiver errado.)
O argumento 'alfa' é o factor de degradação em cada iteração. Por exemplo, se você usou um alfa de 0,5, então o valor médio móvel de hoje seria composto pelos seguintes valores ponderados:
today: 1.0
yesterday: 0.5
2 days ago: 0.25
3 days ago: 0.125
...etc...
É claro, se você tem uma grande variedade de valores, os valores de dez ou quinze dias atrás não vão contribuir muito para a média ponderada de hoje. O argumento 'epsilon' permite-lhe definir um ponto de corte, abaixo do qual deixará de se preocupar com valores antigos (uma vez que a sua contribuição para o valor de hoje será insignificante).
Invocaria a função algo assim:
result = movingAverageExponential(values, 0.75, 0.0001)
Em matplotlib.org exemplos (http://matplotlib.org/examples/pylab_examples/finance_work2.html) é fornecido um bom exemplo de Média móvel Exponencial (EMA) função usando o numpy:
def moving_average(x, n, type):
x = np.asarray(x)
if type=='simple':
weights = np.ones(n)
else:
weights = np.exp(np.linspace(-1., 0., n))
weights /= weights.sum()
a = np.convolve(x, weights, mode='full')[:len(x)]
a[:n] = a[n]
return a
Eu não conheço Python, mas para a parte média, você quer dizer um filtro de baixa passagem exponencialmente decadente do formulário
y_new = y_old + (input - y_old)*alpha
Onde Alfa = dt/tau, dt = o timestep do filtro, tau = a constante de tempo do filtro? (a forma variável-timestep deste é como se segue, apenas clipe dt/tau para não ser mais de 1.0)
y_new = y_old + (input - y_old)*dt/tau
Se quiser filtrar algo como uma data, certifique-se de converter para uma quantidade de ponto flutuante como # de segundos Desde 1 de Janeiro de 1970.
Você também pode usar o método do filtro SciPy porque o EMA é um filtro IIR. Isto terá o benefício de ser aproximadamente 64 vezes mais rápido, conforme medido no meu sistema usando timeit em grandes conjuntos de dados quando comparado com a abordagem enumerate().
import numpy as np
from scipy.signal import lfilter
x = np.random.normal(size=1234)
alpha = .1 # smoothing coefficient
zi = [x[0]] # seed the filter state with first value
# filter can process blocks of continuous data if <zi> is maintained
y, zi = lfilter([1.-alpha], [1., -alpha], x, zi=zi)
Eu achei o excerto de código acima por @earino bastante útil - mas eu precisava de algo que pudesse continuamente suavizar um fluxo de valores - então eu refactori-lo para isto:
def exponential_moving_average(period=1000):
""" Exponential moving average. Smooths the values in v over ther period. Send in values - at first it'll return a simple average, but as soon as it's gahtered 'period' values, it'll start to use the Exponential Moving Averge to smooth the values.
period: int - how many values to smooth over (default=100). """
multiplier = 2 / float(1 + period)
cum_temp = yield None # We are being primed
# Start by just returning the simple average until we have enough data.
for i in xrange(1, period + 1):
cum_temp += yield cum_temp / float(i)
# Grab the timple avergae
ema = cum_temp / period
# and start calculating the exponentially smoothed average
while True:
ema = (((yield ema) - ema) * multiplier) + ema
E uso - o assim:
def temp_monitor(pin):
""" Read from the temperature monitor - and smooth the value out. The sensor is noisy, so we use exponential smoothing. """
ema = exponential_moving_average()
next(ema) # Prime the generator
while True:
yield ema.send(val_to_temp(pin.read()))
(em que pino.read () produz o próximo valor que eu gostaria de consumir).
def emaWeight(numSamples):
return 2 / float(numSamples + 1)
def ema(close, prevEma, numSamples):
return ((close-prevEma) * emaWeight(numSamples) ) + prevEma
samples = [
22.27, 22.19, 22.08, 22.17, 22.18, 22.13, 22.23, 22.43, 22.24, 22.29,
22.15, 22.39, 22.38, 22.61, 23.36, 24.05, 23.75, 23.83, 23.95, 23.63,
23.82, 23.87, 23.65, 23.19, 23.10, 23.33, 22.68, 23.10, 22.40, 22.17,
]
emaCap = 10
e=samples[0]
for s in range(len(samples)):
numSamples = emaCap if s > emaCap else s
e = ema(samples[s], e, numSamples)
print e
Um caminho rápido (copy-pasted from here) é o seguinte:
def ExpMovingAverage(values, window):
""" Numpy implementation of EMA
"""
weights = np.exp(np.linspace(-1., 0., window))
weights /= weights.sum()
a = np.convolve(values, weights, mode='full')[:len(values)]
a[:window] = a[window]
return a
def expma(aseries, ratio):
return sum([ratio*aseries[-x-1]*((1-ratio)**x) for x in range(len(aseries))])