長時間相関解析といえば、DFA(Detrended Fluctuation Analysis) があまりにも有名です。さまざまな分野の論文で DFA が用いられており、そのため「長時間相関を調べるなら DFA」という認識が、半ば常識のように広まっています。
しかし実は、同じ考え方に基づく解析法として DMA(Detrended Moving Average Analysis)という方法があり、性質の面では DMA の方が優れている点があります。とくに重要なのは、スケーリング指数を推定する際に用いるゆらぎ関数の挙動です。DFA ではこの関数がガタガタしやすく、その結果、回帰に不要な誤差が入り込みがちです。一方、DMA ではゆらぎ関数がなめらかになりやすく、そのような問題は生じにくくなります。この意味で、DMA は DFA の改良版 と言ってよい方法です。
さらに、DMA にも DFA と同様に、高次のトレンド除去を可能にする 高次 DMA (Savitzky-Galayフィルタを使ったDMA:SG-DMA)が存在します。私は独自にSG-DMA の高速アルゴリズムを開発し、C 言語で実装して利用していますが、これは誰もが気軽に使えるものではありません。そのため、高次 DMA を試してみたいと思っても、どのように実装すればよいのかわからないという人が多いのが現状だと思います。
そこで今回は、高次 DMA を R で実装する方法を紹介します。これは私が用いている高速アルゴリズムではありませんが、最近のパソコンは計算速度が十分に速いため、長さがおよそ 程度の時系列であれば、数秒程度で計算できるはずです。
になると数十秒かかると思います(C言語の高速アルゴリズムなら秒かかりませんが)。今回紹介するプログラムがすごいのは、時系列の端の処理が追加されて、短い時系列でも精度が少し改善していることです。Savitzky-Galayフィルタのデフォルトの扱いと違い、端まできちんとトレンド計算しているので、一般的なSavitzky-Galayフィルタを使うよりも精度が高くなってます。
細かいことはどうでもいいと感じている人が多いと思いますので、とりあえず試してみてください。
1. まずは使ってみる
ここでは理屈の説明は省き、「どのスクリプトを用意し、どのコマンドを実行すればよいか」だけを説明します。やり方がわければ、
自分が解析したい時系列を
xに代入するだけ
でスケーリング指数を計算できます。
1-1.【R】最初にSG-DMA の本体関数を定義する
解析を始める前に、SG-DMA の本体関数を R に読み込んで定義しておく必要があります。具体的には、以下のスクリプト全体を 最初に実行してください。このスクリプトは、Rを起動した後に1回だけ実行する必要があります。
############################################################
# SG-DMA(Savitzky–Golay 型 DMA)
# 目的:
# x から平均を引いた系列 x0 に対し,局所窓で p 次多項式トレンドを除去した残差 eps を作り,
# F = sqrt(mean(eps^2)) を計算する(DMA の fluctuation)。
#
# プロセス:
# 1) 窓内のプロファイル y(累積和)に p 次多項式回帰 → 予測 y_hat
# 2) eps(target) = y(target) - y_hat(target) を y の線形結合 r で表す
# 3) y は x の累積和なので,eps を x の線形結合 w(FIR 重み)に変換
# 4) 中央部は固定窓の w を filter で一括適用(高速)
# 5) 端点は窓が変わるため点ごとに w を作って内積(ただし N>=1024 なら端点計算を省略)
############################################################
# ----------------------------------------------------------
# G v = b を解く
# ----------------------------------------------------------
safe_solve_linear <- function(G, b,
tol_qr = 1e-12,
lambda0 = 1e-12,
lambda_mult = 10,
lambda_tries = 6) {
v <- tryCatch(solve(G, b), error = function(e) NULL)
if (!is.null(v) && all(is.finite(v))) return(as.numeric(v))
v <- tryCatch(qr.solve(G, b, tol = tol_qr), error = function(e) NULL)
if (!is.null(v) && all(is.finite(v))) return(as.numeric(v))
I <- diag(ncol(G))
lambda <- lambda0
for (k in seq_len(lambda_tries)) {
Gr <- G + lambda * I
v <- tryCatch(solve(Gr, b), error = function(e) NULL)
if (!is.null(v) && all(is.finite(v))) return(as.numeric(v))
v <- tryCatch(qr.solve(Gr, b, tol = tol_qr), error = function(e) NULL)
if (!is.null(v) && all(is.finite(v))) return(as.numeric(v))
lambda <- lambda * lambda_mult
}
rep(NA_real_, length(b))
}
# ----------------------------------------------------------
# 窓内インデックス j_vec に対して,eps(target) = Σ w[k] x[k] の重み w を作る
# ----------------------------------------------------------
calc_sg_weights <- function(j_vec, p, target_idx, use_tail_sum = TRUE,
tol_qr = 1e-12, lambda0 = 1e-12) {
n <- length(j_vec)
if (n <= p) return(rep(NA_real_, n)) # 点数不足(p 次回帰に不適)
if (target_idx < 1 || target_idx > n) return(rep(NA_real_, n))
# デザイン行列 A(多項式基底)
A <- matrix(1.0, nrow = n, ncol = p + 1)
if (p >= 1) for (k in 1:p) A[, k + 1] <- j_vec^k
# 正規方程式 G v = b を解き,target 行の射影係数を得る
G <- crossprod(A)
b <- A[target_idx, ]
v <- safe_solve_linear(G, b, tol_qr = tol_qr, lambda0 = lambda0)
if (!all(is.finite(v))) return(rep(NA_real_, n))
h <- as.vector(A %*% v)
# eps(target) = (e_target - h)^T y を r として表す
r <- -h
r[target_idx] <- 1 + r[target_idx]
# y は x の累積和なので,x に対する重み w に変換
if (use_tail_sum) rev(cumsum(rev(r))) else r
}
# ----------------------------------------------------------
# SG-DMA 本体
# - N >= skip_edges_if_N_ge のとき端点計算を省略(eps 端は NA のまま)
# ----------------------------------------------------------
sg_dma_sg_fast <- function(x, M, p,
tol_qr = 1e-12, lambda0 = 1e-12,
skip_edges_if_N_ge = 1024) {
x <- as.numeric(x)
N <- length(x)
if (N < 2) stop("x の長さが短すぎます。")
if (M < 1) stop("M must be >= 1.")
if (p < 0) stop("p must be >= 0.")
if (N < 2 * M + 1) {
return(list(eps = rep(NA_real_, N), F2 = NA_real_, F = NA_real_))
}
# 平均除去
x0 <- x - mean(x, na.rm = TRUE)
# 残差(中央部は必ず計算)
eps <- rep(NA_real_, N)
# 中央部:固定窓の重みを作って filter で一括適用
j_local <- -M:M
w_local <- calc_sg_weights(j_local, p, target_idx = M + 1, use_tail_sum = TRUE,
tol_qr = tol_qr, lambda0 = lambda0)
if (!all(is.finite(w_local))) {
return(list(eps = eps, F2 = NA_real_, F = NA_real_))
}
eps_mid <- as.numeric(stats::filter(x0, w_local, sides = 2))
eps[(M + 1):(N - M)] <- eps_mid[(M + 1):(N - M)]
# N が十分長い場合は端点を計算しない(高速化)
if (N >= skip_edges_if_N_ge) {
F2 <- mean(eps^2, na.rm = TRUE)
return(list(eps = eps, F2 = F2, F = sqrt(F2)))
}
# 端点:点ごとに窓が変わるため,その都度重みを作って内積
max_left <- min(M, N)
if (max_left >= 1) {
T_max <- max_left + M
x_head <- x0[1:min(T_max, N)]
# 左端
for (n0 in 1:max_left) {
T_curr <- min(n0 + M, N)
w <- calc_sg_weights(1:T_curr, p, target_idx = n0, use_tail_sum = TRUE,
tol_qr = tol_qr, lambda0 = lambda0)
if (all(is.finite(w))) eps[n0] <- sum(w * x_head[1:T_curr])
}
# 右端(反転して左端と同じ処理)
x0_rev <- rev(x0)
x_head_rev <- x0_rev[1:min(T_max, N)]
for (n0_rev in 1:max_left) {
n_idx <- N - n0_rev + 1
if (is.na(eps[n_idx])) {
T_curr <- min(n0_rev + M, N)
w <- calc_sg_weights(1:T_curr, p, target_idx = n0_rev, use_tail_sum = TRUE,
tol_qr = tol_qr, lambda0 = lambda0)
if (all(is.finite(w))) eps[n_idx] <- sum(w * x_head_rev[1:T_curr])
}
}
}
F2 <- mean(eps^2, na.rm = TRUE)
list(eps = eps, F2 = F2, F = sqrt(F2))
}
以上のスクリプトでは、
- SG-DMA に必要な重みを計算する関数
calc_sg_weights() - 実際に SG-DMA を 1 スケール分実行する関数
sg_dma_sg_fast()
の 2つの関数 を定義しています。
上のスクリプトを実行した段階では、解析はまだ何も実行されません。あくまで、
「DMAの道具を R の中に用意した」
だけです。
実務的には、このコードを
- そのままスクリプトの先頭に貼り付けて実行する
- あるいは
sg_dma_functions.Rのようなファイルに保存し、
source("sg_dma_functions.R")
として読み込む
のどちらかで構いません。
この本体関数を一度定義してしまえば、
以降は 時系列 x を与えて SG-DMA を呼び出すだけで解析ができるようになります。
1-2.【R】DMAを実行
次は 「x がすでにある」 前提で解析を進めます。良く分からない人も、以下のスクリプトを実行すれば解析できます。
## 前提:
## 1) calc_sg_weights(), sg_dma_sg_fast() を先に実行して定義済み
## 2) x に解析したい時系列(numeric)が入っている
# --- 入力の確認 ---
x <- as.numeric(x)
N <- length(x)
# --- 設定(まずはこのままでOK)---
p_dma <- 2 # トレンド除去次数
n_scales <- 30 # スケール点数
s.min <- 15 # 回帰に使う最小スケール
s.max <- round(N / 20) * 2 + 1 # 最大スケール(奇数)
# --- スケール列(M と s=2M+1)---
M_min <- ceiling(p_dma / 2) + 1
M_max <- round(N / 8)
M <- sort(unique(round(exp(seq(log(M_min), log(M_max), length.out = n_scales)))))
s <- 2 * M + 1
# --- F(s) 計算 ---
F_vec <- vapply(M, function(Mi) sg_dma_sg_fast(x, Mi, p_dma)$F, numeric(1L))
# --- 回帰(指定レンジのみ)---
log10_s <- log10(s)
log10_F <- log10(F_vec)
ok <- (s >= s.min) & (s <= s.max) & is.finite(log10_s) & is.finite(log10_F)
fit <- lm(log10_F[ok] ~ log10_s[ok])
slope <- coef(fit)[2]
# --- 図と結果 ---
plot(log10_s, log10_F, xlab="log10(s)", ylab="log10 F(s)",
main=sprintf("SG-DMA (p=%d) slope=%.3f fit[%d,%d]", p_dma, slope, s.min, s.max))
abline(v=c(log10(s.min), log10(s.max)), lty=2, col=gray(0.6))
points(log10_s[ok], log10_F[ok], pch=16, col=4)
abline(fit, lty=2, col=2)
cat(sprintf("Estimated slope = %.6f (n=%d points)\n", slope, sum(ok)))
このスクリプトを実行すれば、[tex: \log{10} s] と [tex: \log{10} F(s)] のグラフが描かれ、
Estimated slope = 0.775978 (n=19 points)
のように、スケーリング指数の推定値「0.775978」が表示されます。
■ 解析設定:p_dma, n_scales, [s.min, s.max]
上のスクリプトで注目する(調整する)以下の項目について説明します。
p_dma <- 2 n_scales <- 30 s.min <- 15 s.max <- round(N / 20) * 2 + 1
これらはすべて、スケーリング指数の推定結果を左右する重要な設定です。
p_dma:トレンド除去の次数
SG-DMA は、時系列に含まれる多項式トレンドからの残差を評価する方法です。p_dma は、その 多項式の次数を指定します。
p_dma = 0→ 定数(平均レベル)と直線トレンドまで除去p_dma = 2→ 3次まで除去
奇数次の p_dma は、一つ値の小さい偶数次の p_dma と結果が同じになりますので、基本が偶数を設定して下さい。つまり、1次と0次は同じ結果、2次と1次は同じ結果になります。多くの場合、p_dma <- 2 で良いと思います。
n_scales:グラフを描く点数
n_scales は、グラフに描く点の数を調整します。大きい値にすると計算に時間がかかるので、グラフにしたときにマークが重ならないくらいの数を用意すれば良いと思います。
s.min, s.max:回帰に使うスケール範囲(最重要)
スケーリング指数を推定するレンジを指定します。そのため、
- log–log プロットを描いたうえで
- 直線に見える範囲だけを選んで回帰する
という操作を行います。その範囲が [s.min, s.max] です。
現在はスケーリング領域が広いと仮定して、
s.min <- 15 s.max <- round(N / 20) * 2 + 1
としています。s.max は、大きくても時系列の長さの1/10くらいにしてください。s.min は、スケーリング指数が0.5以上であれば、
s.min <- 10
くらいでいいですが、0.5より小さいときは、
s.min <- 15
か、これよりも大きく設定してください。

N_sim <- 1000 としました。
2.【R】DMAの数値実験
以下は、longmemo パッケージを使って、非整数ガウスノイズのサンプルを生成し、スケーリング指数を推定するスクリプトです。このスクリプトは手持ちのデータがなくでも解析を実行することができます。
############################################################
# SG-DMA (Savitzky–Golay type DMA) demo script
#
# 目的:
# 1)【サンプル生成部】例として fGn(Fractional Gaussian Noise)を x に代入
# 2)【DMA解析部】x を入力として SG-DMA によりスケーリング指数 slope を推定
#
# 使い方:
# - 自分のデータを解析したい場合は、【サンプル生成部】を削除し、
# x <- (あなたの時系列ベクトル)
# とするだけで同じ解析が実行できる。
############################################################
# ==========================================================
# 0. ユーティリティ
# ==========================================================
fmt_sec <- function(sec) {
if (is.na(sec)) return(NA_character_)
if (sec < 1) sprintf("%.3f s", sec)
else if (sec < 60) sprintf("%.2f s", sec)
else sprintf("%.2f min", sec / 60)
}
# ==========================================================
# A) サンプル時系列の生成(例:fGn)
# ※ここは「例」です。解析したいデータがある場合は、
# x に時系列を代入する部分だけ残してください。
# ==========================================================
# install.packages("longmemo") # 未導入の場合
require(longmemo)
H_true <- 0.8 # fGn の真の Hurst 指数(例)
N_sim <- 1000 # 生成する時系列長(例)
tm_sim <- system.time({
x <- simFGN0(N_sim, H_true) # ★例:生成した時系列を x に代入
})
cat("[Sample] fGn generation: ", fmt_sec(tm_sim["elapsed"]),
" (N = ", length(x), ", H = ", H_true, ")\n", sep = "")
# ==========================================================
# B) DMA 解析(ここから先は「x が与えられている」ことだけを前提に動く)
# ==========================================================
# ---- DMA 設定(解析に関わるパラメータ)----
p_dma <- 2 # 多項式次数 p
n_scales <- 30 # スケール点数(length.out も同値)
# M の最小・最大を決めるルール(必要に応じて調整)
M_min_rule <- function(p) ceiling(p / 2) + 1
M_max_rule <- function(N) round(N / 8)
# スケーリングレンジ [s.min, s.max](この範囲だけで slope を推定)
# 例:s.min は十分小さすぎない値,s.max は N に対して大きすぎない値
s.min <- 10
# x の長さから自動で s.max を決める例(奇数になるよう調整)
N <- length(x) # ★ x の長さをここで取得して以降に使う
s.max <- round(N / 20) * 2 + 1
# ==========================================================
# 解析の計測開始
# ==========================================================
t0_all <- Sys.time()
cat("\n---- SG-DMA analysis (timing) ----\n")
cat("Start:", format(t0_all, "%Y-%m-%d %H:%M:%S"), "\n\n")
# ==========================================================
# 1) スケール系列(M と s=2M+1)を作る
# ==========================================================
tm_scale <- system.time({
M_min <- M_min_rule(p_dma)
M_max <- M_max_rule(N)
M <- unique(round(exp(seq(
log(M_min),
log(M_max),
length.out = n_scales
))))
M <- sort(M)
s <- 2 * M + 1
})
cat("[1] Scale grid build: ", fmt_sec(tm_scale["elapsed"]),
" (p = ", p_dma,
", n_scales = ", length(M),
", M range = [", min(M), ", ", max(M), "])\n", sep = "")
# ==========================================================
# 2) 各スケールで F(s) を計算
# ==========================================================
get_F_for_M <- function(Mi) {
res <- sg_dma_sg_fast(x, Mi, p_dma)
res$F
}
tm_F <- system.time({
F_vec <- vapply(M, get_F_for_M, numeric(1L))
})
cat("[2] F(s) computation: ", fmt_sec(tm_F["elapsed"]),
" (per scale ≈ ", fmt_sec(tm_F["elapsed"] / length(M)), ")\n", sep = "")
dma_df <- data.frame(
M = M,
s = s,
F = F_vec,
log10_s = log10(s),
log10_F = log10(F_vec)
)
# ==========================================================
# 3) 指定したスケーリングレンジ [s.min, s.max] だけで回帰
# ==========================================================
in_range <- (dma_df$s >= s.min) & (dma_df$s <= s.max) &
is.finite(dma_df$log10_s) & is.finite(dma_df$log10_F)
dma_fit_df <- dma_df[in_range, , drop = FALSE]
if (nrow(dma_fit_df) < 2) {
stop("指定した [s.min, s.max] に回帰に十分な点がありません。s.min/s.max を見直してください。")
}
# ==========================================================
# 4) 回帰・描画
# ==========================================================
tm_plot <- system.time({
plot(dma_df$log10_s, dma_df$log10_F,
xlab = "log10(s)",
ylab = "log10 F(s)",
main = sprintf("SG-DMA (p = %d), fit range: [%g, %g]", p_dma, s.min, s.max))
# フィット範囲を縦線で表示
abline(v = c(log10(s.min), log10(s.max)), lty = 2, col = gray(0.6))
# フィットに使う点を強調表示
points(dma_fit_df$log10_s, dma_fit_df$log10_F, pch = 16, col = 4)
# 指定レンジ内のみで回帰
lfit <- lm(log10_F ~ log10_s, data = dma_fit_df)
abline(lfit, col = 2, lty = 2)
slope <- coef(lfit)[2]
usr <- par("usr")
text(usr[1] + 0.05 * diff(usr[1:2]),
usr[4] - 0.05 * diff(usr[3:4]),
sprintf("slope (fit in range) = %.3f\nn = %d points", slope, nrow(dma_fit_df)),
adj = c(0, 1))
})
cat("[3] Regression + plot: ", fmt_sec(tm_plot["elapsed"]), "\n", sep = "")
# ==========================================================
# 5) 終了サマリー表示
# ==========================================================
t1_all <- Sys.time()
elapsed_all <- as.numeric(difftime(t1_all, t0_all, units = "secs"))
summary_msg <- sprintf(
paste0(
"\n========== SG-DMA summary ==========\n",
"End time : %s\n",
"Total elapsed : %s\n",
"\n",
"--- Input data ---\n",
"Time series length N : %d\n",
"\n",
"--- DMA settings ---\n",
"Polynomial order p : %d\n",
"Number of scales : %d\n",
"M range : [%d, %d]\n",
"Scaling range s : [%d, %d] (n = %d points)\n",
"\n",
"--- Scaling result ---\n",
"Estimated slope : %.4f\n",
"====================================\n"
),
format(t1_all, "%Y-%m-%d %H:%M:%S"),
fmt_sec(elapsed_all),
N,
p_dma,
length(M),
min(M), max(M),
s.min, s.max,
nrow(dma_fit_df),
coef(lfit)[2]
)
cat(summary_msg)
■ スクリプトの説明
1) fmt_sec(sec):処理時間を見やすく表示する関数
fmt_sec <- function(sec) { if (is.na(sec)) return(NA_character_) if (sec < 1) sprintf("%.3f s", sec) else if (sec < 60) sprintf("%.2f s", sec) else sprintf("%.2f min", sec / 60) }
system.time() の "elapsed"(経過時間:秒)を受け取り、状況に応じて
- 1秒未満:ミリ秒相当まで(小数3桁)で
"0.123 s" - 1〜60秒:小数2桁で
"12.34 s" - 60秒以上:分表示で
"1.23 min"
のように整形して返します。
2) M_min_rule(p):最小スケール(半窓幅 M)の下限を決めるルール
M_min_rule <- function(p) ceiling(p / 2) + 1
多項式次数 p を与えると、「窓が小さすぎて高次のトレンド除去が不安定になる」事態を避けるために、M の最小値 M_min を自動で決めます。
pが大きいほど、窓内で当てはめる多項式は複雑になり、ある程度の点数(窓幅)が必要になります。- そこで、簡単な経験則として [ M_{\min} = \lceil p/2 \rceil + 1 ] を使っています。
この式は「理論上の厳密条件」というより、危険な小窓を避けるための安全策です。必要ならユーザーが変更して構いませんが、まずはこのままが無難です。
3) M_max_rule(N):最大スケール(半窓幅 M)の上限を決めるルール
M_max_rule <- function(N) round(N / 8)
時系列長 N から、M の最大値 M_max を決めます。窓が大きすぎると、
- 中央部で「完全な窓」を使える点数が減る
- 推定が不安定になりやすい
- 端点処理の影響が相対的に大きくなる
といった問題が出ます。そのため、スケールの上限を N/8 程度に制限しています。
これも「絶対の正解」ではなく、大窓を使いすぎないための経験則です。
データが十分長い場合は N/10 や N/6 などに調整しても構いません。
4) get_F_for_M(Mi):指定したスケールで SG-DMA を1回実行し、F だけ返す関数
get_F_for_M <- function(Mi) { res <- sg_dma_sg_fast(x, Mi, p_dma) res$F }
sg_dma_sg_fast() は結果をリストで返します(eps, F2, F)。スケーリング解析では通常、必要なのは 各スケールの F(s) だけです。
そこでこの関数は、
- 半窓幅
Miを受け取って SG-DMA を実行し res$F(揺らぎの大きさ)だけを取り出して返す
というラッパーになっています。
3. まとめ
RスクリプトでDMAを実行する方法を紹介しました。もっと高速な方が良ければC言語のプログラムを提供しますので連絡してください。あるいは、Rcppなどを使ってさらに高速なものを作ってパッケージとして世界に配布してもらえれば嬉しいです。
参考文献は以下です。
Detrending moving average algorithm: Frequency response and scaling performances Anna Carbone & Ken Kiyono Physical Review E 93, 063309 (2016). DMA の基本的性質と周波数応答、スケーリング性能について解析的に詳述した代表的論文です。
Fast algorithm for scaling analysis with higher-order detrending moving average method Yutaka Tsujimoto, Yuki Miki, Satoshi Shimatani & Ken Kiyono Physical Review E 93, 053304 (2016). SG 型の高次 DMA(高次トレンド除去)に対して、高速アルゴリズムを提案した論文です。
Time and frequency domain characteristics of detrending-operation-based scaling analysis: Exact DFA and DMA frequency responses Ken Kiyono & Yutaka Tsujimoto Physical Review E 94, 012111 (2016). DFA と DMA を同一フレームワークで捉え、周波数領域・時間領域双方の特徴を精密に比較した論文です。
Nonlinear filtering properties of detrended fluctuation analysis Ken Kiyono & Yutaka Tsujimoto Physica A 462, 807–815 (2016). DFA(およびそれに関連するトレンド除去演算)の非線形フィルタ特性を評価した論文で、トレンド除去法全般の理論的理解に役立ちます。
※ もし記事の中で「ここ違うよ」という点や気になるところがあれば、気軽に指摘していただけると助かります。質問や「このテーマも取り上げてほしい」といったリクエストも大歓迎です。必ず対応するとは約束できませんが、できるだけ今後の記事で扱いたいと思います。それと、下のはてなブログランキングはあまり信用できる指標ではなさそうですが (私のブログを読んでいる人は、実際とても少ないです)、押してもらえるとシンプルに励みになります。気が向いたときにポチッとしていただけたら嬉しいです。