diff --git a/arch/tests/test_har.py b/arch/tests/test_har.py new file mode 100644 index 0000000000..98e7c88beb --- /dev/null +++ b/arch/tests/test_har.py @@ -0,0 +1,312 @@ +"""Tests for the HAR-RV model (arch.univariate.har).""" + +from __future__ import annotations + +import numpy as np +import pandas as pd +import pytest +from numpy.testing import assert_allclose + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +RNG = np.random.default_rng(12345) +N = 500 # number of observations + + +def make_rv(n: int = N, seed: int = 42) -> pd.Series: + """Generate a synthetic realized-variance series (non-negative AR(1)-like).""" + rng = np.random.default_rng(seed) + eps = rng.standard_normal(n) + rv = np.abs(eps) ** 2 # chi-squared(1) proxy for RV + # Add a mild AR(1) structure to make it more realistic + for t in range(1, n): + rv[t] = 0.6 * rv[t - 1] + 0.4 * np.abs(eps[t]) ** 2 + return pd.Series(rv, name="RV") + + +# Pre-compute a shared RV series for tests that don't need isolation +RV_SERIES = make_rv() + + +# --------------------------------------------------------------------------- +# Import under PYTHONPATH (works whether arch-har is on sys.path or installed) +# --------------------------------------------------------------------------- + +import sys +import os + +# Ensure the cloned repo is importable +_REPO = os.path.join(os.path.dirname(__file__), "..", "..") +if _REPO not in sys.path: + sys.path.insert(0, os.path.abspath(_REPO)) + +from arch.univariate.har import HAR, HARResult # noqa: E402 + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestHARFit: + """Tests for HAR.fit().""" + + def test_har_fit_returns_result_object(self): + """fit() should return a HARResult instance.""" + model = HAR(RV_SERIES) + result = model.fit() + assert isinstance(result, HARResult) + + def test_har_params_shape(self): + """Default HAR should have 4 parameters: const + daily + weekly + monthly.""" + model = HAR(RV_SERIES) + result = model.fit() + assert result.params_.shape == (4,), ( + f"Expected 4 params, got {result.params_.shape}" + ) + assert result.param_names == ["const", "daily", "weekly", "monthly"] + + def test_har_rsquared_in_range(self): + """R-squared must be in [0, 1] for realistic data.""" + model = HAR(RV_SERIES) + result = model.fit() + r2 = result.rsquared + assert 0.0 <= r2 <= 1.0, f"R-squared out of range: {r2}" + + def test_har_fitted_values_shape(self): + """predict() must return a Series of length (n - max_lag).""" + model = HAR(RV_SERIES) + result = model.fit() + fitted = result.predict() + expected_len = len(RV_SERIES) - 22 # max_lag = 22 + assert len(fitted) == expected_len, ( + f"Fitted length {len(fitted)} != expected {expected_len}" + ) + + def test_har_forecast_shape(self): + """forecast(horizon=h) should return an array of length h.""" + model = HAR(RV_SERIES) + result = model.fit() + for h in [1, 5, 22]: + fc = result.forecast(horizon=h) + assert len(fc) == h, f"Forecast length {len(fc)} != horizon {h}" + + def test_har_pvalues_in_range(self): + """HAC p-values must lie in [0, 1].""" + model = HAR(RV_SERIES) + result = model.fit() + pvals = result.pvalues + assert (pvals >= 0.0).all(), "Some p-values are negative" + assert (pvals <= 1.0).all(), "Some p-values exceed 1" + + def test_har_summary_runs(self): + """summary() should not raise and should return a non-empty string.""" + model = HAR(RV_SERIES) + result = model.fit() + s = result.summary() + assert isinstance(s, str) and len(s) > 0 + + def test_har_summary_contains_key_info(self): + """summary() should mention R-squared and param names.""" + model = HAR(RV_SERIES) + result = model.fit() + s = result.summary() + assert "R-squared" in s + assert "const" in s + assert "daily" in s + + +class TestHARCustomLags: + """Tests for HAR with non-default lag specifications.""" + + def test_har_custom_lags_two(self): + """HAR with lags=[1, 10] should produce 3 params (const + 2 betas).""" + model = HAR(RV_SERIES, lags=[1, 10]) + result = model.fit() + assert result.params_.shape == (3,), ( + f"Expected 3 params, got {result.params_.shape}" + ) + assert result.param_names == ["const", "daily", "rv_lag10"] + + def test_har_custom_lags_single(self): + """HAR with a single lag [1] should produce 2 params.""" + model = HAR(RV_SERIES, lags=[1]) + result = model.fit() + assert result.params_.shape == (2,) + + def test_har_custom_lags_rsquared(self): + """Custom lags should still give a valid R-squared.""" + model = HAR(RV_SERIES, lags=[1, 10]) + result = model.fit() + assert 0.0 <= result.rsquared <= 1.0 + + def test_har_lags_sorted_internally(self): + """HAR should sort lags so max_lag is used correctly.""" + model1 = HAR(RV_SERIES, lags=[22, 1, 5]) + model2 = HAR(RV_SERIES, lags=[1, 5, 22]) + res1 = model1.fit() + res2 = model2.fit() + assert_allclose(res1.params_, res2.params_, rtol=1e-10) + + +class TestHARAgainstOLS: + """Verify HAR params match a manual OLS computation.""" + + def test_har_against_numpy_lstsq(self): + """HAR params should match manually built OLS solution.""" + rv = np.asarray(RV_SERIES, dtype=float) + lags = [1, 5, 22] + max_lag = 22 + n = len(rv) - max_lag + + # Build design matrix manually (matching HAR._build_features logic) + X_manual = np.ones((n, 4)) + for i in range(n): + t = i + max_lag + X_manual[i, 1] = rv[t - 1 : t].mean() # lag 1 + X_manual[i, 2] = rv[t - 5 : t].mean() # lag 5 + X_manual[i, 3] = rv[t - 22 : t].mean() # lag 22 + y_manual = rv[max_lag:] + + params_manual, *_ = np.linalg.lstsq( + X_manual.T @ X_manual, + X_manual.T @ y_manual, + rcond=None, + ) + + model = HAR(RV_SERIES, lags=lags) + result = model.fit() + + assert_allclose(result.params_, params_manual, rtol=1e-8, atol=1e-12) + + def test_har_rsquared_against_manual(self): + """R-squared should match the manual calculation.""" + rv = np.asarray(RV_SERIES, dtype=float) + max_lag = 22 + n = len(rv) - max_lag + + X_manual = np.ones((n, 4)) + for i in range(n): + t = i + max_lag + X_manual[i, 1] = rv[t - 1 : t].mean() + X_manual[i, 2] = rv[t - 5 : t].mean() + X_manual[i, 3] = rv[t - 22 : t].mean() + y_manual = rv[max_lag:] + + params_manual, *_ = np.linalg.lstsq( + X_manual.T @ X_manual, + X_manual.T @ y_manual, + rcond=None, + ) + resid = y_manual - X_manual @ params_manual + ss_res = resid @ resid + ss_tot = np.sum((y_manual - y_manual.mean()) ** 2) + r2_manual = 1.0 - ss_res / ss_tot + + model = HAR(RV_SERIES) + result = model.fit() + assert_allclose(result.rsquared, r2_manual, rtol=1e-10) + + +class TestHARInputValidation: + """Tests for input validation in HAR.__init__.""" + + def test_har_rejects_negative_lags(self): + with pytest.raises(ValueError, match="positive integers"): + HAR(RV_SERIES, lags=[-1, 5]) + + def test_har_rejects_zero_lag(self): + with pytest.raises(ValueError, match="positive integers"): + HAR(RV_SERIES, lags=[0, 5]) + + def test_har_rejects_empty_lags(self): + with pytest.raises(ValueError, match="non-empty"): + HAR(RV_SERIES, lags=[]) + + def test_har_rejects_too_short_series(self): + short = pd.Series(np.ones(10)) + with pytest.raises(ValueError): + HAR(short, lags=[1, 5, 22]) + + def test_har_rejects_bad_cov_type(self): + model = HAR(RV_SERIES) + with pytest.raises(ValueError, match="cov_type"): + model.fit(cov_type="nonsense") + + def test_har_accepts_numpy_array(self): + """HAR should accept a plain numpy array as input.""" + rv_arr = np.abs(np.random.randn(300)) ** 2 + model = HAR(rv_arr) + result = model.fit() + assert isinstance(result, HARResult) + + +class TestHARUnadjustedCov: + """Tests for the unadjusted (homoskedastic) covariance option.""" + + def test_unadjusted_pvalues_in_range(self): + model = HAR(RV_SERIES) + result = model.fit(cov_type="unadjusted") + pvals = result.pvalues + assert (pvals >= 0.0).all() + assert (pvals <= 1.0).all() + + def test_unadjusted_params_same_as_robust(self): + """OLS coefficients are the same regardless of cov_type.""" + model = HAR(RV_SERIES) + res_robust = model.fit(cov_type="robust") + res_plain = model.fit(cov_type="unadjusted") + assert_allclose(res_robust.params_, res_plain.params_, rtol=1e-12) + + +class TestHARForecast: + """Tests for the forecast method.""" + + def test_forecast_horizon_1(self): + model = HAR(RV_SERIES) + result = model.fit() + fc = result.forecast(horizon=1) + assert fc.shape == (1,) + assert np.isfinite(fc[0]) + + def test_forecast_horizon_22(self): + model = HAR(RV_SERIES) + result = model.fit() + fc = result.forecast(horizon=22) + assert fc.shape == (22,) + assert np.all(np.isfinite(fc)) + + def test_forecast_horizon_0_raises(self): + model = HAR(RV_SERIES) + result = model.fit() + with pytest.raises(ValueError, match="horizon"): + result.forecast(horizon=0) + + def test_forecast_is_nonnegative_for_nonneg_data(self): + """For positive RV data and positive coefficients, forecast should be finite.""" + model = HAR(RV_SERIES) + result = model.fit() + fc = result.forecast(horizon=5) + # Just check finiteness; can't guarantee non-negative with arbitrary params + assert np.all(np.isfinite(fc)) + + +class TestHARProperties: + """Tests for HARResult computed properties.""" + + def test_params_is_series(self): + result = HAR(RV_SERIES).fit() + assert isinstance(result.params, pd.Series) + assert list(result.params.index) == ["const", "daily", "weekly", "monthly"] + + def test_std_errors_positive(self): + result = HAR(RV_SERIES).fit() + se = result.std_errors + assert (se > 0).all(), "All standard errors should be positive" + + def test_tvalues_shape(self): + result = HAR(RV_SERIES).fit() + tv = result.tvalues + assert tv.shape == (4,) diff --git a/arch/univariate/__init__.py b/arch/univariate/__init__.py index 0f30e5980c..7d9496fba8 100644 --- a/arch/univariate/__init__.py +++ b/arch/univariate/__init__.py @@ -8,6 +8,7 @@ SkewStudent, StudentsT, ) +from arch.univariate.har import HAR, HARResult from arch.univariate.mean import ( ARX, HARX, @@ -41,6 +42,8 @@ "APARCH", "ARCH", "ARX", + "HAR", + "HARResult", "EGARCH", "FIGARCH", "GARCH", diff --git a/arch/univariate/har.py b/arch/univariate/har.py new file mode 100644 index 0000000000..0d8b365974 --- /dev/null +++ b/arch/univariate/har.py @@ -0,0 +1,386 @@ +""" +HAR (Heterogeneous Autoregressive) model for realized volatility. + +Implements the Corsi (2009) HAR-RV model via OLS: + + RV_t = c + beta_d * RV_{t-1} + beta_w * RV^(w)_{t-1} + beta_m * RV^(m)_{t-1} + eps_t + +where RV^(w) and RV^(m) are rolling averages over 5 and 22 days respectively. + +Reference +--------- +Corsi, F. (2009). A Simple Approximate Long-Memory Model of Realized Volatility. +*Journal of Financial Econometrics*, 7(2), 174-196. +""" + +from __future__ import annotations + +from typing import Optional, Sequence, Union + +import numpy as np +import pandas as pd +from scipy import stats + +__all__ = ["HAR", "HARResult"] + + +def _newey_west_cov(X: np.ndarray, resid: np.ndarray, lags: int) -> np.ndarray: + """Compute Newey-West HAC covariance matrix. + + Parameters + ---------- + X : ndarray of shape (n, k) + Regressor matrix. + resid : ndarray of shape (n,) + OLS residuals. + lags : int + Number of lags for Newey-West kernel. + + Returns + ------- + ndarray of shape (k, k) + HAC covariance matrix for the OLS estimator. + """ + n, k = X.shape + # Score matrix: (n, k) + scores = X * resid[:, np.newaxis] + + # Sandwich meat: S = Gamma_0 + sum_{l=1}^{lags} w_l * (Gamma_l + Gamma_l') + # w_l = 1 - l / (lags + 1) (Bartlett kernel) + S = scores.T @ scores # Gamma_0 + + for lag in range(1, lags + 1): + w = 1.0 - lag / (lags + 1) + gamma = scores[lag:].T @ scores[:-lag] + S += w * (gamma + gamma.T) + + # Bread: (X'X)^{-1} + XtX_inv = np.linalg.inv(X.T @ X) + + # Sandwich: (X'X)^{-1} S (X'X)^{-1} + cov = XtX_inv @ S @ XtX_inv + return cov + + +class HARResult: + """Result object returned by :meth:`HAR.fit`. + + Attributes + ---------- + params : ndarray + OLS coefficient estimates in order: [const, beta_1, beta_2, ...]. + param_names : list of str + Names corresponding to each parameter. + resid_ : ndarray + OLS residuals. + nobs : int + Number of observations used in estimation. + cov_params_ : ndarray + Estimated covariance matrix of parameters. + """ + + def __init__( + self, + params: np.ndarray, + param_names: list[str], + resid: np.ndarray, + X: np.ndarray, + y: np.ndarray, + cov_params: np.ndarray, + lags: list[int], + y_original: pd.Series, + ) -> None: + self.params_ = params + self.param_names = param_names + self.resid_ = resid + self._X = X + self._y = y + self.cov_params_ = cov_params + self.lags = lags + self._y_original = y_original + self.nobs = len(y) + + @property + def rsquared(self) -> float: + """R-squared of the OLS fit.""" + ss_res = float(self.resid_ @ self.resid_) + ss_tot = float(np.sum((self._y - self._y.mean()) ** 2)) + if ss_tot == 0: + return float("nan") + return 1.0 - ss_res / ss_tot + + @property + def pvalues(self) -> pd.Series: + """HAC-robust two-sided p-values for each parameter (Newey-West).""" + se = np.sqrt(np.diag(self.cov_params_)) + t_stats = self.params_ / se + pvals = 2.0 * stats.t.sf(np.abs(t_stats), df=self.nobs - len(self.params_)) + return pd.Series(pvals, index=self.param_names) + + @property + def std_errors(self) -> pd.Series: + """HAC-robust standard errors.""" + se = np.sqrt(np.diag(self.cov_params_)) + return pd.Series(se, index=self.param_names) + + @property + def tvalues(self) -> pd.Series: + """t-statistics using HAC standard errors.""" + return pd.Series( + self.params_ / np.sqrt(np.diag(self.cov_params_)), + index=self.param_names, + ) + + @property + def params(self) -> pd.Series: + """Estimated parameters as a pandas Series.""" + return pd.Series(self.params_, index=self.param_names) + + def predict( + self, + start: Optional[int] = None, + end: Optional[int] = None, + ) -> pd.Series: + """Return in-sample fitted values. + + Parameters + ---------- + start : int, optional + Starting index (default: 0). + end : int, optional + Ending index exclusive (default: nobs). + + Returns + ------- + pd.Series + Fitted values. + """ + fitted = self._X @ self.params_ + start = start if start is not None else 0 + end = end if end is not None else self.nobs + return pd.Series(fitted[start:end], name="fitted_values") + + def forecast(self, horizon: int = 1) -> np.ndarray: + """Multi-step ahead forecast using recursive substitution. + + For HAR-RV, the h-step forecast iterates forward by appending the + one-step forecast to the end of the series and recomputing the + rolling averages. + + Parameters + ---------- + horizon : int + Number of steps ahead to forecast. + + Returns + ------- + ndarray of shape (horizon,) + Forecasted values. + """ + if horizon < 1: + raise ValueError("horizon must be >= 1") + + # Reconstruct the series including estimation sample + max_lag = max(self.lags) + # We need the raw rv series to build the feature rows for new steps. + # _y_original contains the full original series; we take the tail we need. + rv = np.asarray(self._y_original, dtype=float) + + # Extract the coefficients: [const, beta_1, beta_2, ...] + const = self.params_[0] + betas = self.params_[1:] # one per lag + + forecasts = [] + # Working copy: extend with forecasted values iteratively + rv_ext = rv.copy().tolist() + + for _ in range(horizon): + row = [1.0] + for lag in self.lags: + # Average of the last `lag` values + window = rv_ext[-lag:] + row.append(float(np.mean(window))) + yhat = const + float(np.array(row[1:]) @ betas) + forecasts.append(yhat) + rv_ext.append(yhat) + + return np.array(forecasts) + + def summary(self) -> str: + """Return a text summary table of the HAR estimation results.""" + lines = [] + lines.append("=" * 60) + lines.append("HAR-RV Model Results (Corsi 2009)") + lines.append("=" * 60) + lines.append(f"No. Observations: {self.nobs:>10d}") + lines.append(f"Lags: {str(self.lags):>10s}") + lines.append(f"R-squared: {self.rsquared:>10.4f}") + lines.append("-" * 60) + lines.append( + f"{'Parameter':<20} {'Coef':>10} {'Std Err':>10} " + f"{'t-stat':>10} {'p-value':>10}" + ) + lines.append("-" * 60) + ses = np.sqrt(np.diag(self.cov_params_)) + tstats = self.params_ / ses + pvals = self.pvalues.values + for name, coef, se, t, p in zip( + self.param_names, self.params_, ses, tstats, pvals + ): + lines.append( + f"{name:<20} {coef:>10.6f} {se:>10.6f} {t:>10.4f} {p:>10.4f}" + ) + lines.append("=" * 60) + lines.append("Std. Errors: Newey-West HAC") + return "\n".join(lines) + + +class HAR: + """Heterogeneous Autoregressive model for realized volatility (HAR-RV). + + Implements the Corsi (2009) model via OLS: + + RV_t = c + beta_d * RV_{t-1} + beta_w * RV^(w)_{t-1} + + beta_m * RV^(m)_{t-1} + epsilon_t + + where the component averages are defined as: + + RV^(w)_{t-1} = mean(RV_{t-1}, ..., RV_{t-5}) + RV^(m)_{t-1} = mean(RV_{t-1}, ..., RV_{t-22}) + + Parameters + ---------- + y : pd.Series or array-like + Time series of realized variances (must be non-negative). + lags : list of int, optional + Lag horizons for the HAR components. Default is [1, 5, 22] + (daily, weekly, monthly). Each entry ``L`` contributes one + regressor: the rolling mean of the previous ``L`` observations. + + Examples + -------- + >>> import numpy as np + >>> import pandas as pd + >>> rng = np.random.default_rng(42) + >>> rv = pd.Series(np.abs(rng.standard_normal(500)) ** 2) + >>> model = HAR(rv) + >>> result = model.fit() + >>> print(result.params) + >>> print(result.rsquared) + """ + + def __init__( + self, + y: Union[pd.Series, np.ndarray], + lags: Optional[list[int]] = None, + ) -> None: + if lags is None: + lags = [1, 5, 22] + if not all(isinstance(lag, (int, np.integer)) and lag >= 1 for lag in lags): + raise ValueError("All lags must be positive integers.") + if len(lags) == 0: + raise ValueError("lags must be non-empty.") + + self.lags: list[int] = sorted(int(lag) for lag in lags) + self._max_lag = max(self.lags) + + if isinstance(y, pd.Series): + self._y = y + else: + self._y = pd.Series(np.asarray(y, dtype=float)) + + if len(self._y) <= self._max_lag: + raise ValueError( + f"y must have more than {self._max_lag} observations " + f"(max lag = {self._max_lag})." + ) + + def _build_features(self) -> tuple[np.ndarray, np.ndarray, list[str]]: + """Build the OLS regressor matrix and response vector. + + Returns + ------- + X : ndarray of shape (T - max_lag, 1 + len(lags)) + Design matrix: [1, rolling_mean_lag1, rolling_mean_lag2, ...]. + y_dep : ndarray of shape (T - max_lag,) + Dependent variable: RV_t for t = max_lag, ..., T-1. + names : list of str + Column names for X. + """ + rv = np.asarray(self._y, dtype=float) + T = len(rv) + n = T - self._max_lag # effective sample size + + # Response: rv[max_lag], rv[max_lag+1], ..., rv[T-1] + y_dep = rv[self._max_lag :] + + # Build one column per lag + # For lag L at time t, the feature is mean(rv[t-L], ..., rv[t-1]) + # i.e. the mean of the L observations ending at t-1. + cols = [np.ones(n)] + names = ["const"] + + # Precompute lag names + lag_labels = {1: "daily", 5: "weekly", 22: "monthly"} + + for lag in self.lags: + col = np.empty(n) + for i in range(n): + t = i + self._max_lag # current row corresponds to rv[t] + col[i] = rv[t - lag : t].mean() + cols.append(col) + + if lag in lag_labels: + names.append(lag_labels[lag]) + else: + names.append(f"rv_lag{lag}") + + X = np.column_stack(cols) + return X, y_dep, names + + def fit(self, cov_type: str = "robust") -> HARResult: + """Fit the HAR model by OLS. + + Parameters + ---------- + cov_type : {'robust', 'unadjusted'} + Covariance estimator for standard errors. + + * ``'robust'``: Newey-West HAC with automatic bandwidth + (``floor(4 * (n/100)^(2/9))`` lags, per convention). + * ``'unadjusted'``: Homoskedastic OLS covariance. + + Returns + ------- + HARResult + """ + if cov_type not in ("robust", "unadjusted"): + raise ValueError("cov_type must be 'robust' or 'unadjusted'.") + + X, y_dep, names = self._build_features() + n, k = X.shape + + # OLS: beta = (X'X)^{-1} X'y + XtX = X.T @ X + Xty = X.T @ y_dep + params, *_ = np.linalg.lstsq(XtX, Xty, rcond=None) + resid = y_dep - X @ params + + if cov_type == "robust": + # Newey-West bandwidth: floor(4 * (n/100)^(2/9)) + nw_lags = max(1, int(np.floor(4.0 * (n / 100.0) ** (2.0 / 9.0)))) + cov_params = _newey_west_cov(X, resid, nw_lags) + else: + s2 = float(resid @ resid) / (n - k) + cov_params = s2 * np.linalg.inv(XtX) + + return HARResult( + params=params, + param_names=names, + resid=resid, + X=X, + y=y_dep, + cov_params=cov_params, + lags=self.lags, + y_original=self._y, + )