Source code for scaleinvariance.analysis.structure_function

import numpy as np
import warnings

from .. import backend as B
from ..utils import process_lags, estimate_hurst_from_scaling


[docs] def structure_function(data, order=1, max_sep=None, axis=0, lags='powers of 1.2', periodic=False): """ Calculate the mean structure function for regularly spaced scalar data along a given axis. The structure function S_n(r) of order n is defined as: S_n(r) = <|u(x + r) - u(x)|^n> where <> denotes the average over all valid pairs separated by lag r. Parameters ----------- data : array-like N-dimensional scalar data array. order : float or 1-D array-like of float, optional Order(s) of the structure function (default: 1). Each order must be positive. When an array is given, the absolute increments |u(x+r) - u(x)| are computed once per lag and reused across all orders (avoids redundant work). max_sep : int, optional Maximum lag to compute. If None, defaults to (size along axis) - 1, or (size along axis) // 2 when ``periodic=True``. axis : int, optional Axis along which to compute the structure function (default: 0). lags : str, list, or np.ndarray, optional Option for selecting lag values. Can be: - A list or array of lags. - 'all': all integer lags from 1 to max_sep. - 'powers of X': lags as integers of powers of X, where X is any number (e.g., 'powers of 2', 'powers of 1.2'). periodic : bool, optional If True, treat the domain as periodic (toroidal) along ``axis``: the increment at lag r uses all L pairs ``u((x + r) mod L) - u(x)`` rather than dropping the L - r edge pairs. On a periodic domain S(r) = S(L - r) exactly, so lags above L // 2 are redundant; ``max_sep`` is therefore capped at L // 2 and an explicit ``max_sep > L // 2`` raises ValueError. Returns -------- tuple (np.ndarray, np.ndarray) lags : 1-D array of lag values. sf : structure function values. Shape is (n_lags,) if `order` is a scalar, or (n_orders, n_lags) if `order` is a 1-D array. The lag axis is always last. """ data = B.asarray(data) order_is_scalar = np.ndim(order) == 0 order_arr = np.atleast_1d(np.asarray(order, dtype=np.float64)) if order_arr.ndim != 1: raise ValueError("Order must be a scalar or 1-D array.") if np.any(order_arr <= 0): raise ValueError("All orders must be positive.") # Validate axis if axis < 0: axis += data.ndim if axis < 0 or axis >= data.ndim: raise ValueError("Axis is out of bounds for data dimensions.") L = data.shape[axis] if max_sep is None: max_sep = (L // 2) if periodic else (L - 1) else: if periodic and max_sep > L // 2: raise ValueError( f"For periodic=True, max_sep must be <= L // 2 ({L // 2}) because " f"S(r) = S(L - r) on a periodic domain; got max_sep={max_sep}." ) max_sep = min(max_sep, L - 1) # Process lag options lags = process_lags(lags, max_sep, even_only=False) sf_values = np.empty((order_arr.size, lags.size), dtype=np.float64) # Compute structure function values for each lag along the specified axis for i, lag in enumerate(lags): if periodic: # Toroidal: all L pairs u((x + r) mod L) - u(x). diff = B.abs(B.roll(data, -int(lag), axis=axis) - data) else: # Build slices for the given axis (drops the L - r edge pairs). slice1 = [slice(None)] * data.ndim slice2 = [slice(None)] * data.ndim slice1[axis] = slice(lag, None) slice2[axis] = slice(None, -lag) diff = B.abs(data[tuple(slice1)] - data[tuple(slice2)]) if (B.numel(diff) == 0) or B.all(B.isnan(diff)): sf_values[:, i] = np.nan continue for j, o in enumerate(order_arr): sf_values[j, i] = float(B.nanmean(diff ** o)) if order_is_scalar: sf_values = sf_values[0] return lags, sf_values
[docs] def costructure_function(data1, data2, order1=1, order2=1, max_sep=None, axis=0, lags='powers of 1.2', periodic=False): """ Calculate the mixed (two-field, two-exponent) structure function. Defined as: S_{p,q}(r) = <|u_1(x + r) - u_1(x)|^p * |u_2(x + r) - u_2(x)|^q> where <> denotes the average over positions where neither increment is NaN. (Matches the NaN-only masking convention of `structure_function` and `haar_fluctuation`; `inf` values are not specially handled.) The diagonal p = q case recovers a single-exponent co-structure function; the p -> 0 or q -> 0 slices recover the single-field structure functions of the other field (up to the caveat that 0 is not a permitted order here). Parameters ----------- data1, data2 : array-like N-dimensional scalar data arrays. Must have identical shape. order1, order2 : float or 1-D array-like of float, optional Exponent(s) applied to |delta f_1| and |delta f_2| respectively (default: 1). Each order must be positive. Arrays are allowed on either or both; the absolute increments of each field are computed once per lag and reused across all (order1, order2) combinations. max_sep : int, optional Maximum lag to compute. If None, defaults to (size along axis) - 1. axis : int, optional Axis along which to compute increments (default: 0). lags : str, list, or np.ndarray, optional Lag selection (see `structure_function` for accepted forms). periodic : bool, optional If True, treat the domain as periodic (toroidal) along ``axis``: both increments wrap around the array end (all L pairs per lag). ``max_sep`` is capped at L // 2 and an explicit ``max_sep > L // 2`` raises ValueError (see `structure_function`). Returns -------- tuple (np.ndarray, np.ndarray) lags : 1-D array of lag values. cosf : co-structure function values. Shape depends on order1/order2 being scalar vs array: - (n_lags,) both scalar - (n_order1, n_lags) order1 array, order2 scalar - (n_order2, n_lags) order1 scalar, order2 array - (n_order1, n_order2, n_lags) both arrays The lag axis is always last. Notes ----- The average is over positions where neither increment is NaN. This is asymmetric with `structure_function`, which averages over positions where the single field's increment is non-NaN. If one field has NaNs and the other does not, the effective sample count here is the intersection of valid positions. `inf` values are not specially handled and will propagate through to the result (consistent with the NaN-only masking of the single-field `structure_function`). """ data1 = B.asarray(data1) data2 = B.asarray(data2) if data1.shape != data2.shape: raise ValueError( f"data1 and data2 must have the same shape; got {data1.shape} vs {data2.shape}" ) o1_is_scalar = np.ndim(order1) == 0 o2_is_scalar = np.ndim(order2) == 0 o1 = np.atleast_1d(np.asarray(order1, dtype=np.float64)) o2 = np.atleast_1d(np.asarray(order2, dtype=np.float64)) if o1.ndim != 1 or o2.ndim != 1: raise ValueError("order1 and order2 must each be scalar or 1-D.") if np.any(o1 <= 0) or np.any(o2 <= 0): raise ValueError("All orders must be positive.") if axis < 0: axis += data1.ndim if axis < 0 or axis >= data1.ndim: raise ValueError("Axis is out of bounds for data dimensions.") L = data1.shape[axis] if max_sep is None: max_sep = (L // 2) if periodic else (L - 1) else: if periodic and max_sep > L // 2: raise ValueError( f"For periodic=True, max_sep must be <= L // 2 ({L // 2}) because " f"S(r) = S(L - r) on a periodic domain; got max_sep={max_sep}." ) max_sep = min(max_sep, L - 1) lags = process_lags(lags, max_sep, even_only=False) cosf = np.empty((o1.size, o2.size, lags.size), dtype=np.float64) for i, lag in enumerate(lags): if periodic: d1 = B.abs(B.roll(data1, -int(lag), axis=axis) - data1) d2 = B.abs(B.roll(data2, -int(lag), axis=axis) - data2) else: slice1 = [slice(None)] * data1.ndim slice2 = [slice(None)] * data1.ndim slice1[axis] = slice(lag, None) slice2[axis] = slice(None, -lag) sl1 = tuple(slice1) sl2 = tuple(slice2) d1 = B.abs(data1[sl1] - data1[sl2]) d2 = B.abs(data2[sl1] - data2[sl2]) if (B.numel(d1) == 0) or B.all(B.isnan(d1)) or B.all(B.isnan(d2)): cosf[:, :, i] = np.nan continue for j, p in enumerate(o1): d1p = d1 ** p for k, q in enumerate(o2): cosf[j, k, i] = float(B.nanmean(d1p * (d2 ** q))) # Squeeze scalar order axes. Lags axis is last and is never squeezed. if o1_is_scalar and o2_is_scalar: cosf = cosf[0, 0] elif o1_is_scalar: cosf = cosf[0] elif o2_is_scalar: cosf = cosf[:, 0] return lags, cosf
[docs] def structure_function_hurst(data, min_sep=None, max_sep=None, axis=0, return_fit=False, periodic=False): """ Calculate the Hurst exponent using structure functions. For a self-similar process with Hurst exponent H, the first order structure function scales as: S_1(r) ~ r^H This function estimates H by performing a linear regression on log(S_1(r)) vs log(r). Parameters ----------- data : array-like N-dimensional scalar data array. min_sep : int, optional Minimum separation/lag to include in the fit. If None, defaults to 1. max_sep : int, optional Maximum separation/lag to include in the fit. If None, defaults to array_size/8 or array_size if array_size < 128 (with warning). axis : int, optional Axis along which to compute the structure function (default: 0). return_fit : bool, optional If True, return the lags, structure function values, and fitted line. periodic : bool, optional If True, use periodic (toroidal) increments along ``axis`` (see `structure_function`). The default ``max_sep`` becomes array_size // 2. Returns -------- If return_fit is False: H : float Estimated Hurst exponent. uncertainty : float Standard error of the Hurst exponent estimate. If return_fit is True: H : float Estimated Hurst exponent. uncertainty : float Standard error of the Hurst exponent estimate. lags : np.ndarray The lags used in the calculation. sf_values : np.ndarray The structure function values. fit_line : np.ndarray The fitted line used to estimate H. """ data = B.asarray(data) array_size = data.shape[axis] # Check minimum array size for reliable Hurst estimation if array_size < 16: raise ValueError(f"Array size along axis {axis} is {array_size}, but minimum 16 points required for Hurst estimation") # Set default separations to isolate scaling range >> grid size and << domain size if max_sep is None: max_sep = (array_size // 2) if periodic else (array_size - 1) if array_size <= 512: warnings.warn(f"Array size ({array_size}) is small. Using max_sep={max_sep}. " "Consider using larger arrays for more reliable Hurst estimation.", UserWarning) if min_sep is None: if array_size <= 512: min_sep = 1 else: min_sep = 4 # Calculate structure function lags, sf_values = structure_function(data, order=1, max_sep=max_sep, axis=axis, lags='powers of 1.2', periodic=periodic) # Use common Hurst estimation utility return estimate_hurst_from_scaling(lags, sf_values, min_sep, max_sep, return_fit)
[docs] def structure_function_analysis(*args, **kwargs): """Deprecated: use structure_function() instead.""" import warnings warnings.warn("structure_function_analysis is deprecated, use structure_function instead", DeprecationWarning, stacklevel=2) return structure_function(*args, **kwargs)