import numpy as np
from scipy import stats
import matplotlib.pylab as plt
from pysimfrac.src.general.helper_functions import print_error
def find_first_zero_crossing(lags, acf):
""" Finds the first zero crossing of the autocorrelation function and returns the corresponding distance where the crossing occurs
Parameters
--------------------
lags : numpy array
Distances of the autocorrelation functoin
acf : numpy array
Autocorrelation function values
Returns
--------------------
value : float
Distance where acf first crossing the zero line
Notes
--------------------
Returns -1 is acf does not cross 0
"""
for i, val in enumerate(acf):
if val < 0:
return lags[i]
return -1
def autocorr(x, lag=1):
""" Computes the autocorrelation function at a distance of lag
Parameters
--------------------
x : numpy array
field values (1D array)
lag : int
lag distance
Returns
--------------------
correlation : float
correlation at distance of lag
Notes
--------------------
None
"""
return np.corrcoef(np.array([x[0:len(x) - lag], x[lag:len(x)]]))[0, 1]
def compute_autocorr_y(A, num_lags=None):
# clear this garbage up dude
[n, _] = np.shape(A)
if not num_lags:
num_lags = int(0.25 * n)
tmp = np.zeros((n, num_lags))
for j in range(n):
B = A[j, :]
for i in range(num_lags):
tmp[j, i] = autocorr(B, i)
acf = np.zeros(num_lags)
for i in range(num_lags):
acf[i] = np.mean(tmp[:, i])
lags = np.array(range(num_lags)).astype(float)
return lags, acf
def compute_autocorr_x(A, num_lags=None):
[_, m] = np.shape(A)
if not num_lags:
num_lags = int(0.25 * m)
tmp = np.zeros((m, num_lags))
for j in range(m):
B = A[:, j]
for i in range(num_lags):
tmp[j, i] = autocorr(B, i)
acf = np.zeros(num_lags)
for i in range(num_lags):
acf[i] = np.mean(tmp[:, i])
lags = np.array(range(num_lags)).astype(float)
return lags, acf
def single_field_acf(A, num_lags, h):
""" Compute the correlation length of a field, both in x and y direction.
Parameters
--------------------
A : numpy array
field values (2D array)
num_lag : int
maximum number of lags
h : float
discretization length scale
Returns
--------------------
acf_dict : dict
dictionary with values of the correlation length, lags, and autocorrelation function in both x and y direction
Notes
--------------------
None
"""
## Compute Autocorrelation function in x direction
lags_x, acf_x = compute_autocorr_x(A, num_lags)
lags_x *= h
## compute first zero crossing of ACF in X
corr_x = find_first_zero_crossing(lags_x, acf_x)
## Compute Autocorrelation function in y direction
lags_y, acf_y = compute_autocorr_y(A, num_lags)
lags_y *= h
## compute first zero crossing of ACF in Y
corr_y = find_first_zero_crossing(lags_y, acf_y)
## create dictionary holding all the autocorrelation information
acf_dict = {
"x": {
"correlation": corr_x,
"lags": lags_x,
"acf": acf_x
},
"y": {
"correlation": corr_y,
"lags": lags_y,
"acf": acf_y
},
"anisotropy": corr_x / corr_y
}
return acf_dict
[docs]
def compute_acf(self, surface='all', num_lags=None):
""" Compute the correlation length of a field, both in x and y direction.
Parameters
--------------------
self : object
simFrac Class
surface : str
Named of desired surface to plot. Options are 'aperture', 'top', 'bottom', and 'all'(default).
num_lag : int
Maximum number of lags. If no value is provided, num_lags is set to 1/4 the domain size in that direction.
Returns
--------------------
None
Notes
--------------------
Autocorrelation functions and first crossing estimates of correlation lengths are attached to the simfrac object.
"""
# default is to comput the correlation lengths for all 3 surfaces
if surface == 'all':
surfaces = ["aperture", "top", "bottom"]
for sf in surfaces:
print(f"\n--> Computing correlation length of {sf} field.")
if sf == "aperture":
self.acf[sf] = single_field_acf(
self.aperture, num_lags, self.h)
elif sf == "top":
self.acf[sf] = single_field_acf(
self.top, num_lags, self.h)
elif sf == "bottom":
self.acf[sf] = single_field_acf(
self.bottom, num_lags, self.h)
print(
f"--> Correlation length of {sf} field in the X-direction : {self.acf[sf]['x']['correlation']:0.2e} [{self.units}] "
)
print(
f"--> Correlation length in {sf} field in the Y-direction : {self.acf[sf]['y']['correlation']:0.2e} [{self.units}] "
)
## if a single surface is provided, just compute the correlation for that surface
else:
print(f"\n--> Computing correlation length of {surface} field.")
if surface == "aperture":
self.acf[surface] = single_field_acf(
self.aperture, num_lags, self.h)
elif surface == "top":
self.acf[surface] = single_field_acf(
self.top, num_lags, self.h)
elif surface == "bottom":
self.acf[surface] = single_field_acf(
self.bottom, num_lags, self.h)
else:
print_error(
f"Error. Unknown surface provided - {surface}. Acceptable surfaces are 'top', 'bottom', 'aperture', or 'all'"
)
print(
f"--> Correlation length of {surface} field in the X-direction : {self.acf[surface]['x']['correlation']:0.2e} [{self.units}] "
)
print(
f"--> Correlation length in {surface} field in the Y-direction : {self.acf[surface]['y']['correlation']:0.2e} [{self.units}] "
)
[docs]
def plot_acf(self, surface="all", figname=None):
""" Creates a plot of the autocorrelation function for surfaces.
Parameters
--------------------
self : object
simFrac Class
surface : str
Name of desired surface acf to plot. Options are 'aperture', 'top', 'bottom', or 'all'. Default value is 'all'
figname : str
Name of figure to be saved. If figname is None (default), then no figure is saved.
Returns
--------------------
fig : Figure
ax : Axes
Notes
--------------------
The autocorrelation functions will be computed if they have not be computed already.
"""
if surface == "all":
fig, ax = plt.subplots(nrows=1, ncols=3, figsize=(12, 4))
fig.suptitle(f'Autocorrelation Function of Fracture Surfaces',
fontsize=16)
# Check for ACF for all surfaces
surfaces = ["top", "bottom", "aperture"]
for i, sf in enumerate(surfaces):
# If the ACF hasn't been computed yet, do that first.
if not self.acf[sf]["anisotropy"]:
print(
f"ACF for {sf} field was not computed yet. Doing that now."
)
self.compute_acf(sf)
## Make subplot for current surface
p = ax[i].plot(self.acf[sf]['x']['lags'],
self.acf[sf]['x']['acf'],
label="X-Direction")
ax[i].plot(self.acf[sf]['x']['correlation'],
0,
color=p[0].get_color(),
marker="s",
markersize=10)
p = ax[i].plot(self.acf[sf]['y']['lags'],
self.acf[sf]['y']['acf'],
label="Y-Direction")
ax[i].plot(self.acf[sf]['y']['correlation'],
0,
color=p[0].get_color(),
marker="d",
markersize=10)
# Set attributes
ax[i].set_title(f'{sf}')
if i == 0:
ax[i].set_ylabel('ACF', fontsize=18)
ax[i].set_xlabel(f"Distance [{self.units}]", fontsize=18)
if i == 2:
ax[i].legend(fontsize=16)
ax[i].grid(True)
ax[i].axis([
0,
max(max(self.acf[sf]['x']['lags']),
max(self.acf[sf]['y']['lags'])), -0.6, 1
])
else:
# Check for ACF for spe
if not self.acf[surface]["anisotropy"]:
print(
f"ACF for {surface} field was not computed yet. Doing that now."
)
self.compute_acf(surface)
fig, ax = plt.subplots(figsize=(10, 6))
fig.suptitle(f'Autocorrelation function of {surface} field',
fontsize=24)
p = ax.plot(self.acf[surface]['x']['lags'],
self.acf[surface]['x']['acf'],
label="X-Direction")
ax.plot(self.acf[surface]['x']['correlation'],
0,
color=p[0].get_color(),
marker="s",
markersize=20)
p = ax.plot(self.acf[surface]['y']['lags'],
self.acf[surface]['y']['acf'],
label="Y-Direction")
ax.plot(self.acf[surface]['y']['correlation'],
0,
color=p[0].get_color(),
marker="d",
markersize=20)
plt.ylabel('ACF', fontsize=18)
plt.xlabel(f"Distance [{self.units}]", fontsize=18)
plt.xticks(fontsize=14)
plt.yticks(fontsize=14)
ax.legend(fontsize=18)
plt.grid(True)
# Save figure if the user wants it
if figname:
print(f"\n--> Saving figure to file {figname}")
plt.savefig(figname, dpi=150)
return fig, ax