Source code for scaleinvariance.simulation.FIF

import numpy as np
import warnings
from .. import backend as B
from .fbm import fBm_1D_circulant, fBm_ND_circulant
from .kernels import (
    create_kernel_LS2010,
    create_kernel_naive,
    create_kernel_spectral,
    create_kernel_spectral_odd,
)

def extremal_levy(alpha, size=1):
    """
    Generate random samples from an extremal Lévy distribution using a modified
    Chambers–Mallows–Stuck method.

    Translated from Mathematica code provided by Lovejoy:
    https://www.physics.mcgill.ca/~gang/multifrac/multifractals/software.htm

    Parameters:
        alpha (float): Stability parameter in (0, 2)
        size (int): Number of samples to generate (default is 1)

    Returns:
        ndarray: Array of generated extremal Lévy random variables
    """
    phi = (B.rand(size) - 0.5) * B.pi
    alpha_t = alpha
    phi0 = -(B.pi/2) * (1 - B.abs(1 - alpha_t)) / alpha_t
    R = B.exponential(scale=1.0, size=size)
    eps = 1e-12
    cos_phi = B.cos(phi)
    cos_phi = B.where(B.abs(cos_phi) < eps, B.full_like(cos_phi, eps), cos_phi)
    abs_alpha1 = B.abs(alpha_t - 1)
    abs_alpha1 = B.where(abs_alpha1 < eps, B.full_like(abs_alpha1, eps), abs_alpha1)

    denom = B.cos(phi - alpha_t * (phi - phi0))
    denom = B.where(denom < eps, B.full_like(denom, eps), denom)

    R = B.where(R < eps, B.full_like(R, eps), R)
    sample = (
        B.sign(alpha_t - 1) *
        B.sin(alpha_t * (phi - phi0)) *
        (cos_phi * abs_alpha1) ** (-1/alpha_t) *
        (denom / R) ** ((1 - alpha_t) / alpha_t)
    )
    return sample

# Convolutions
def periodic_convolve(signal, kernel):
    """
    Performs periodic convolution of two 1D arrays using Fourier methods.
    Both arrays must have the same length.

    Parameters:
        signal (array-like): Input signal array.
        kernel (array-like): Convolution kernel array.

    Returns:
        ndarray: The result of the periodic convolution.

    Raises:
        ValueError: If signal and kernel lengths do not match.
    """
    signal = B.asarray(signal)
    kernel = B.asarray(kernel)

    if signal.size != kernel.size:
        raise ValueError("Signal and kernel must have the same length for periodic convolution.")

    # Shift kernel so zero-lag is at index 0 (required for FFT convolution)
    kernel = B.ifftshift(kernel)

    fft_signal = B.fft(signal)
    fft_kernel = B.fft(kernel)

    convolved = B.real(B.ifft(fft_signal * fft_kernel))
    return convolved

def periodic_convolve_nd(signal, kernel):
    """
    Performs periodic convolution of two N-D arrays using Fourier methods.
    Both arrays must have the same shape.

    Parameters:
        signal (ndarray): Input signal array (N-D).
        kernel (ndarray): Convolution kernel array (N-D).

    Returns:
        ndarray: The result of the periodic convolution.

    Raises:
        ValueError: If signal and kernel shapes do not match.
    """
    signal = B.asarray(signal)
    kernel = B.asarray(kernel)

    if signal.shape != kernel.shape:
        raise ValueError("Signal and kernel must have the same shape for periodic convolution.")

    # Shift kernel so zero-lag is at index (0,0,...,0) (required for FFT convolution)
    kernel = B.ifftshift(kernel)

    fft_signal = B.fftn(signal)
    fft_kernel = B.fftn(kernel)

    convolved = B.real(B.ifftn(fft_signal * fft_kernel))
    return convolved


def _warn_if_naive_kernel_selected(kernel_construction_method_flux, kernel_construction_method_observable):
    """Warn when legacy naive FIF kernels are selected."""
    if 'naive' in (kernel_construction_method_flux, kernel_construction_method_observable):
        warnings.warn(
            "Naive FIF kernels are deprecated because their outputs are not remotely accurate. "
            "Use kernel_construction_method_flux/kernel_construction_method_observable="
            "'LS2010' or 'spectral' instead.",
            FutureWarning,
            stacklevel=2,
        )

# FIF

[docs] def FIF_1D(size, alpha, C1, H, levy_noise=None, causal=True, outer_scale=None, outer_scale_width_factor=2.0, kernel_construction_method_flux='LS2010', kernel_construction_method_observable='LS2010', periodic=True): """ Generate a 1D Fractionally Integrated Flux (FIF) multifractal simulation. FIF is a multiplicative cascade model that generates multifractal fields with specified Hurst exponent H, intermittency parameter C1, and Levy index alpha. The method follows Lovejoy & Schertzer 2010, including finite-size corrections. Returns field normalized by mean. Algorithm: 1. Generate extremal Lévy noise with stability parameter alpha 2. Convolve with corrected kernel (finite-size corrected |k|^(-1/alpha)) to create log-flux 3. Scale by (C1)^(1/alpha) and exponentiate to get conserved flux 4. For H ≠ 0: Convolve flux with kernel |k|^(-1+H) to get observable 5. For H < 0: Apply differencing to handle negative Hurst exponents Parameters ---------- size : int Length of simulation (must be even, power of 2 recommended) alpha : float Lévy stability parameter in [0.5, 2] and != 1. Controls noise distribution. Values < 0.5 are currently not implemented for FIF simulation. C1 : float Codimension of the mean, controls intermittency strength. Must be > 0 for multifractal behavior. Special case: C1 = 0 routes to fBm (no intermittency), but requires causal=False since fBm cannot be causal. H : float Hurst exponent in (-1, 1). Controls correlation structure. levy_noise : torch.Tensor, optional Pre-generated Lévy noise for reproducibility. Must have same size as simulation. causal : bool, optional Use causal kernels (future doesn't affect past). Default True. False gives symmetric (non-causal) kernels. outer_scale : int, optional Large-scale cutoff. Defaults to size. outer_scale_width_factor : float, optional Controls transition width for outer scale. Transition width = outer_scale * width_factor. Default is 2.0. kernel_construction_method_flux : str, optional Method for constructing flux (cascade) kernel. Options: - 'LS2010': Lovejoy & Schertzer 2010 finite-size corrections (default) - 'spectral': spectral-response kernel for non-causal periodic runs, LS2010 fallback for causal runs - 'naive': Simple power-law kernels without corrections kernel_construction_method_observable : str, optional Method for constructing observable (H) kernel. Options: - 'LS2010': Lovejoy & Schertzer 2010 finite-size corrections (default) - 'spectral': spectral-response kernel for non-causal periodic runs, LS2010 fallback for causal runs - 'naive': Simple power-law kernels without corrections - 'spectral_odd': Odd (antisymmetric) spectral kernel — left half negated periodic : bool, optional If True (default), returns full periodic simulation suitable for periodic boundary conditions. If False, doubles simulation size internally then returns only the first half to eliminate periodicity artifacts. Returns ------- numpy.ndarray 1D array of simulated multifractal field values Raises ------ ValueError If size is not even or H is outside valid range (-1, 1) ValueError If C1 < 0 or if C1 = 0 with causal=True (fBm cannot be causal) ValueError If provided levy_noise doesn't match specified size ValueError If alpha < 0.5 or alpha == 1 (not implemented) Examples -------- >>> # Basic multifractal with strong intermittency >>> fif = FIF_1D(1024, alpha=1.8, C1=0.1, H=0.3) >>> >>> # Monofractal case (C1=0 routes to fBm, requires causal=False) >>> fbm = FIF_1D(1024, alpha=1.8, C1=0, H=0.3, causal=False) Notes ----- - Computational complexity is O(N log N) due to FFT-based convolutions - Large C1 values (> 0.5) can produce extreme values requiring careful handling - alpha < 0.5 and alpha = 1 are not currently implemented - C1 = 0: Routes internally to fBm_1D_circulant() for monofractal case (requires causal=False since fBm cannot be causal) """ if size % 2 != 0: raise ValueError("size must be an even number; a power of 2 is recommended.") output_size = size if not periodic: size *= 2 # duplicate to eliminate periodicity if C1 == 0 and not causal: if levy_noise is not None: raise ValueError('noise argument not supported for C1=0') result = fBm_1D_circulant(output_size, H, periodic=periodic) return result if not isinstance(C1, (int, float)) or C1 < 0: raise ValueError("C1 must be a non-negative number.") if C1 == 0 and causal: raise ValueError("C1=0 requires causal=False (fBm cannot be causal)") H_int = 0 if H < 0: H_int = -1 H += 1 if not isinstance(H, (int, float)) or H < 0 or H > 1: raise ValueError("H must be a number between -1 and 1.") if not isinstance(alpha, (int, float)) or alpha <= 0 or alpha > 2: raise ValueError("alpha must be a number > 0 and <= 2.") if alpha < 0.5: raise ValueError("alpha < 0.5 is not implemented for FIF simulation") if alpha == 1: raise ValueError("alpha=1 not supported") # requires special treatment which is not implemented _warn_if_naive_kernel_selected(kernel_construction_method_flux, kernel_construction_method_observable) if levy_noise is None: noise = extremal_levy(alpha, size=size) else: levy_noise_arr = B.asarray(levy_noise) if levy_noise_arr.size != output_size: raise ValueError("Provided levy_noise must match the specified size.") # if aperiodic is requested, need to pad noise with more noise if not periodic: noise = B.concatenate([levy_noise_arr, extremal_levy(alpha, size=output_size)]) else: noise = levy_noise_arr if outer_scale is None: outer_scale = output_size # Create flux kernel (kernel 1) # Calculate exponent and normalization parameters for flux kernel alpha_prime = 1.0 / (1.0 - 1.0/alpha) flux_exponent = -1.0 / alpha_prime flux_norm_ratio_exp = -1.0 / alpha flux_final_power = 1.0 / (alpha - 1.0) if kernel_construction_method_flux == 'LS2010': kernel1 = create_kernel_LS2010(size, flux_exponent, flux_norm_ratio_exp, causal=causal, outer_scale=outer_scale, outer_scale_width_factor=outer_scale_width_factor, final_power=flux_final_power) elif kernel_construction_method_flux == 'spectral': if causal: kernel1 = create_kernel_LS2010(size, flux_exponent, flux_norm_ratio_exp, causal=causal, outer_scale=outer_scale, outer_scale_width_factor=outer_scale_width_factor, final_power=flux_final_power) else: kernel1 = create_kernel_spectral(size, flux_exponent, causal=False, outer_scale=outer_scale, outer_scale_width_factor=outer_scale_width_factor, final_power=flux_final_power) elif kernel_construction_method_flux == 'naive': kernel1 = create_kernel_naive(size, flux_exponent, causal=causal, outer_scale=outer_scale, outer_scale_width_factor=outer_scale_width_factor) else: raise ValueError(f"Unknown kernel_construction_method_flux: {kernel_construction_method_flux}") integrated = periodic_convolve(noise, kernel1) del noise, kernel1 # Clean memory # If causal, adjust for the fact that half the kernel is being deleted if causal: causality_factor = 2.0 else: causality_factor = 1.0 scaled = integrated * ((causality_factor * C1) ** (1/alpha)) del integrated flux = B.exp(scaled) del scaled if H == 0: # Normalize - slice first (if periodic), then normalize by mean if not periodic: flux = flux[:size//2] # eliminate periodicity by removing the part corresponding to the appended noise flux = flux / B.mean(flux) return flux # Create H kernel (kernel 2) # Calculate exponent and normalization parameters for H kernel H_exponent = -1.0 + H H_norm_ratio_exp = -H if kernel_construction_method_observable == 'LS2010': kernel2 = create_kernel_LS2010(size, H_exponent, H_norm_ratio_exp, causal=causal, outer_scale=outer_scale, outer_scale_width_factor=outer_scale_width_factor, final_power=None) elif kernel_construction_method_observable == 'spectral': if causal: kernel2 = create_kernel_LS2010(size, H_exponent, H_norm_ratio_exp, causal=causal, outer_scale=outer_scale, outer_scale_width_factor=outer_scale_width_factor, final_power=None) else: kernel2 = create_kernel_spectral(size, H_exponent, causal=False, outer_scale=outer_scale, outer_scale_width_factor=outer_scale_width_factor, final_power=None) elif kernel_construction_method_observable == 'naive': kernel2 = create_kernel_naive(size, H_exponent, causal=causal, outer_scale=outer_scale, outer_scale_width_factor=outer_scale_width_factor) elif kernel_construction_method_observable == 'spectral_odd': kernel2 = create_kernel_spectral_odd(size, H_exponent, causal=False, outer_scale=outer_scale, outer_scale_width_factor=outer_scale_width_factor) else: raise ValueError(f"Unknown kernel_construction_method_observable: {kernel_construction_method_observable}") observable = periodic_convolve(flux, kernel2) if not periodic: observable = observable[:size//2] # eliminate periodicity by removing the part corresponding to the appended noise del flux, kernel2 if H_int == -1: # Apply differencing for H<0 observable = B.diff(observable) # Duplicate last value to maintain original size observable = B.concatenate([observable, observable[-1:]]) # Normalize to zero mean (increments should have zero mean) observable = observable - B.mean(observable) elif kernel_construction_method_observable == 'spectral_odd': # Odd kernel produces zero-mean output (antisymmetric kernel sums to zero). # Normalize to zero mean and unit variance instead of unit mean. observable = observable - B.mean(observable) observable = observable / B.std(observable) else: # Normalize to unit mean (levels should have unit mean) observable = observable / B.mean(observable) return observable
[docs] def FIF_ND(size, alpha, C1, H, levy_noise=None, outer_scale=None, outer_scale_width_factor=2.0, kernel_construction_method_flux='LS2010', kernel_construction_method_observable='LS2010', periodic=False, scale_metric=None, scale_metric_dim=None): """ Generate an N-D Fractionally Integrated Flux (FIF) multifractal simulation. FIF is a multiplicative cascade model that generates multifractal fields with specified Hurst exponent H, intermittency parameter C1, and Levy index alpha. The method follows Lovejoy & Schertzer 2010, including finite-size corrections. Returns field normalized by mean. Algorithm: 1. Generate N-D extremal Lévy noise with stability parameter alpha 2. Convolve with corrected kernel (finite-size corrected |r|^(-d/alpha)) to create log-flux 3. Scale by (C1)^(1/alpha) and exponentiate to get conserved flux 4. For H ≠ 0: Convolve flux with kernel |r|^(-d+H) to get observable Parameters ---------- size : tuple of ints Size of simulation as tuple specifying dimensions (e.g., (height, width) for 2D, (depth, height, width) for 3D). All dimensions must be even numbers. alpha : float Lévy stability parameter in [0.5, 2] and != 1. Controls noise distribution. Values < 0.5 are currently not implemented for FIF simulation. C1 : float Codimension of the mean, controls intermittency strength. Must be > 0 for multifractal behavior. Special case: C1 = 0 routes to fBm_ND_circulant() (no intermittency). H : float Hurst exponent in (0, 1). Controls correlation structure. levy_noise : ndarray, optional Pre-generated N-D Lévy noise for reproducibility. Must have same shape as simulation. outer_scale : int, optional Large-scale cutoff. Defaults to max(dimensions). outer_scale_width_factor : float, optional Controls transition width for outer scale. Transition width = outer_scale * width_factor. Default is 2.0. kernel_construction_method_flux : str, optional Method for constructing flux (cascade) kernel. Options: - 'LS2010': Lovejoy & Schertzer 2010 finite-size corrections (default) - 'spectral': exact isotropic spectral-response kernel for periodic N-D runs kernel_construction_method_observable : str, optional Method for constructing observable (H) kernel. Options: - 'LS2010': Lovejoy & Schertzer 2010 finite-size corrections (default) - 'spectral': exact isotropic spectral-response kernel for periodic N-D runs periodic : bool or tuple of bool, optional Controls periodicity behavior for each axis. - If bool: applies same periodicity to all axes (default False) - If tuple: specifies per-axis periodicity, must have length matching size tuple When an axis is non-periodic (False), the domain is doubled along that axis internally and one hyper-octant is returned to suppress periodic artifacts. scale_metric : ndarray, optional Custom N-D array defining the scale metric (generalized distance) for GSI norms. If None (default), standard Euclidean distance is computed. **Shape requirement**: Must match simulation domain shape after accounting for periodicity (doubled along non-periodic axes). For example, if size=(256, 256) and periodic=False, scale_metric.shape must be (512, 512). **Note**: For LS2010 method, the metric should use dx=2 spacing to match the kernel construction grid. scale_metric_dim : float, optional Dimension for scaling exponents in kernel calculations. This may differ from the spatial dimension when using GSI norms with non-integer dimensions. If None (default), uses the spatial dimension (inferred from size tuple length). **Use case**: In Generalized Scale Invariance (GSI), the scaling dimension may be non-integer and different from the embedding space dimension. Returns ------- numpy.ndarray N-D array of simulated multifractal field values Raises ------ ValueError If size dimensions are not even or H is outside valid range (0, 1) ValueError If C1 < 0 (must be non-negative; C1 = 0 routes to fBm) ValueError If provided levy_noise doesn't match specified size ValueError If alpha < 0.5 or alpha == 1 (not implemented) Examples -------- >>> # 2D multifractal with strong intermittency >>> fif = FIF_ND((512, 512), alpha=1.8, C1=0.1, H=0.3) >>> >>> # 3D multifractal >>> fif = FIF_ND((128, 128, 128), alpha=1.5, C1=0.05, H=0.2) >>> >>> # 2D with per-axis periodicity (periodic in x, non-periodic in y) >>> fif = FIF_ND((256, 256), alpha=1.6, C1=0.08, H=0.4, periodic=(True, False)) >>> >>> # Monofractal case (C1=0 routes to fBm_ND_circulant) >>> fbm = FIF_ND((512, 512), alpha=1.8, C1=0, H=0.3) >>> >>> # Using custom GSI norm (example with anisotropic scaling) >>> import numpy as np >>> size = (256, 256) >>> sim_size = (512, 512) # doubled for non-periodic >>> # Create coordinate arrays with dx=2 spacing (to match LS2010) >>> x = np.arange(-(sim_size[0]-1), sim_size[0], 2) >>> y = np.arange(-(sim_size[1]-1), sim_size[1], 2) >>> X, Y = np.meshgrid(x, y, indexing='ij') >>> # Define anisotropic GSI metric with different scaling in x and y >>> scale_metric = (X**2 + (Y/2)**2)**0.5 # y-direction scales faster >>> # Use non-integer dimension for GSI >>> fif = FIF_ND(size, alpha=1.7, C1=0.1, H=0.3, periodic=False, ... scale_metric=scale_metric, scale_metric_dim=2.3) Notes ----- - Computational complexity is O(N log N) due to FFT-based convolutions - Large C1 values (> 0.5) can produce extreme values requiring careful handling - alpha < 0.5 and alpha = 1 are not currently implemented - N-D kernels are always non-causal - Spectral N-D kernels currently assume the default isotropic Fourier metric and do not support custom `scale_metric` - Does not support negative H values (use FIF_1D for H < 0) - C1 = 0: Routes internally to fBm_ND_circulant() for monofractal case """ # Handle size parameter and infer dimension if not isinstance(size, tuple): raise ValueError("size must be a tuple of dimensions (e.g., (512, 512) for 2D)") output_size = size ndim = len(size) # Handle periodic parameter (bool or tuple of bool) if isinstance(periodic, bool): periodic_tuple = tuple(periodic for _ in range(ndim)) elif isinstance(periodic, tuple): if len(periodic) != ndim: raise ValueError(f"periodic tuple length ({len(periodic)}) must match size tuple length ({ndim})") if not all(isinstance(p, bool) for p in periodic): raise ValueError("All elements in periodic tuple must be boolean") periodic_tuple = periodic else: raise ValueError("periodic must be either a bool or a tuple of bool") # Determine simulation domain size (double each dimension if not periodic) sim_size = tuple(s if periodic_tuple[i] else s * 2 for i, s in enumerate(output_size)) # Validate scale_metric shape if provided if scale_metric is not None: scale_metric = B.asarray(scale_metric) if scale_metric.shape != sim_size: raise ValueError( f"scale_metric shape {scale_metric.shape} must match simulation domain shape {sim_size}. " f"For periodic={periodic}, with size={output_size}, the simulation domain is " f"{sim_size} (doubled along non-periodic axes)." ) # Set dimension for scaling exponents (defaults to spatial dimension) if scale_metric_dim is None: scale_metric_dim = float(ndim) else: scale_metric_dim = float(scale_metric_dim) # C1==0 shortcut: use fBm (no intermittency) if C1 == 0: if levy_noise is not None: raise ValueError('levy_noise argument not supported for C1=0 (fBm shortcut)') return fBm_ND_circulant(output_size, H, periodic=periodic_tuple) if outer_scale is None: outer_scale = max(sim_size) if any(s % 2 != 0 for s in sim_size): raise ValueError("All dimensions must be even numbers; powers of 2 are recommended.") if not isinstance(C1, (int, float)) or C1 < 0: raise ValueError("C1 must be a non-negative number.") if not isinstance(alpha, (int, float)) or alpha <= 0 or alpha > 2: raise ValueError("alpha must be a number > 0 and <= 2.") if alpha < 0.5: raise ValueError("alpha < 0.5 is not implemented for FIF simulation") if alpha == 1: raise ValueError("alpha=1 not supported") # requires special treatment which is not implemented if not isinstance(H, (int, float)) or H < 0 or H > 1: raise ValueError("H must be a number between 0 and 1.") # Generate or validate Lévy noise if levy_noise is None: total_size = np.prod(sim_size) noise = extremal_levy(alpha, size=total_size).reshape(sim_size) else: levy_tensor = B.asarray(levy_noise) if all(periodic_tuple): if levy_tensor.shape != sim_size: raise ValueError("Provided levy_noise must match the specified size.") noise = levy_tensor else: if levy_tensor.shape != output_size: raise ValueError("Provided levy_noise must match the specified size.") total_size = np.prod(sim_size) noise = extremal_levy(alpha, size=total_size).reshape(sim_size) # Insert provided noise at the beginning noise[tuple(slice(s) for s in output_size)] = levy_tensor # Create flux kernel (kernel 1) # Calculate exponent and normalization parameters for N-D flux kernel alpha_prime = 1.0 / (1.0 - 1.0/alpha) flux_exponent = -scale_metric_dim / alpha_prime flux_norm_ratio_exp = -scale_metric_dim / alpha flux_final_power = 1.0 / (alpha - 1.0) if kernel_construction_method_flux == 'LS2010': kernel1 = create_kernel_LS2010(sim_size, flux_exponent, flux_norm_ratio_exp, causal=False, outer_scale=outer_scale, outer_scale_width_factor=outer_scale_width_factor, final_power=flux_final_power, scale_metric=scale_metric) elif kernel_construction_method_flux == 'spectral': if scale_metric is not None: raise ValueError("Spectral N-D kernels do not support custom scale_metric") kernel1 = create_kernel_spectral(sim_size, flux_exponent, causal=False, outer_scale=outer_scale, outer_scale_width_factor=outer_scale_width_factor, final_power=flux_final_power, scaling_dimension=scale_metric_dim) else: raise ValueError(f"Unknown kernel_construction_method_flux for N-D: {kernel_construction_method_flux}") # Perform first convolution integrated = periodic_convolve_nd(noise, kernel1) # Scale and exponentiate to get flux scaled = integrated * (C1 ** (1/alpha)) del integrated flux = B.exp(scaled) del scaled if H == 0: output_slices = tuple(slice(None) if periodic_tuple[i] else slice(output_size[i]) for i in range(ndim)) flux = flux[output_slices] return flux / B.mean(flux) # Create H kernel (kernel 2) # Calculate exponent and normalization parameters for N-D H kernel H_exponent = -scale_metric_dim + H H_norm_ratio_exp = -H if kernel_construction_method_observable == 'LS2010': kernel2 = create_kernel_LS2010(sim_size, H_exponent, H_norm_ratio_exp, causal=False, outer_scale=outer_scale, outer_scale_width_factor=outer_scale_width_factor, final_power=None, scale_metric=scale_metric) elif kernel_construction_method_observable == 'spectral': if scale_metric is not None: raise ValueError("Spectral N-D kernels do not support custom scale_metric") kernel2 = create_kernel_spectral(sim_size, H_exponent, causal=False, outer_scale=outer_scale, outer_scale_width_factor=outer_scale_width_factor, final_power=None, scaling_dimension=scale_metric_dim) else: raise ValueError(f"Unknown kernel_construction_method_observable for N-D: {kernel_construction_method_observable}") # Perform second convolution observable = periodic_convolve_nd(flux, kernel2) # Extract output based on per-axis periodicity, then normalize by mean of returned portion output_slices = tuple(slice(None) if periodic_tuple[i] else slice(output_size[i]) for i in range(ndim)) observable = observable[output_slices] observable = observable / B.mean(observable) return observable