Inversion App¶
Inversion is a process to find a model of subsurface resistivity that can fit the observed EM data. Two major goals of the inversion is 1) fitting the data for a given level of data errors/noise and 2) constraining resistivity values to vary smoothly in space. This app provides a capability to conduct 1D EM inversion. By inverting two voltage curves (i.e. EM data), we obtain a vertical resistivity profile that can fit the data.
App parameters¶
EM system
:AEM
ortTEM
Simulation or Inversion
:Sim
: Simulation modeInv
: Inversion mode
z
: depth to the layerh
: thickness of the layerrho_b
: background resistivityrho
: layer resistivity% error
: percentage error
Plotting parameters¶
Resistivity range
: range of resistivity shown in the left panelDepth range
: range of depth shown in the left panel
from emrecharge.app import AppState
import numpy as np
import matplotlib as mpl
from matplotlib import pyplot as plt
mpl.rcParams.update({'font.size':12})
from SimPEG import maps
import SimPEG.electromagnetics.time_domain as tdem
import json
import SimPEG.electromagnetics.time_domain as tdem
from SimPEG import utils
from ipywidgets import interactive, widgets
from IPython.display import display, HTML, JSON
from ipywidgets import (
Label, VBox, HBox,
IntSlider, RadioButtons, Checkbox, Output, SelectionSlider, Dropdown, Button,
AppLayout, Layout, interactive_output, FloatRangeSlider, BoundedIntText, BoundedFloatText
)
from SimPEG.electromagnetics.utils import em1d_utils
from discretize import TensorMesh
from discretize.utils import volume_average
import dill
import matplotlib.pyplot as plt
# inverse problem
from discretize import TensorMesh
import SimPEG.electromagnetics.time_domain as tdem
from SimPEG.utils import mkvc, plot_1d_layer_model
from SimPEG import (
maps,
data,
data_misfit,
inverse_problem,
regularization,
optimization,
directives,
inversion,
utils,
)
from IPython.display import display, clear_output
from traitlets import Int, Float
from scipy.stats import percentileofscore
fname_aem = './aem_system.json'
fname_ttem = './ttem_system.json'
def get_vertical_discretization(n_layer, minimum_dz, geomtric_factor):
"""
Creates a list of vertical discretizations generate from a geometric series.
>>> minimum_dz * geomtric_factor ** np.arange(n_layer)
Parameters
----------
n_layer : int
The number of discretizations
minimum_dz : float
The smallest discretization
geometric_factor : float
The expansion factor of the discretizations
Returns
(n_layer) : numpy.ndarray
The cell widths
"""
hz = minimum_dz * (geomtric_factor) ** np.arange(n_layer)
return hz
class EMSIMULATION:
"""EMSIMULATION"""
def __init__(self):
self.n_layer = 3
self._initialize()
self.dobs = None
self.dpred_inv = None
self.rho_inv = None
self.rmse = None
# Widget variables
def _initialize(self):
# AEM
with open(fname_aem, 'r') as infile:
system_aem = json.load(infile)
unit_conversion = 1e-12
area = system_aem['tx_area']
i_start_hm = 10
i_start_lm = 10
i_end_hm = 10 + 27
i_end_lm = 10 + 20 -2
waveform_hm = np.array(system_aem['hm']['waveform'])
waveform_lm = np.array(system_aem['lm']['waveform'])
time_input_currents_HM = waveform_hm[:,0]
input_currents_HM = waveform_hm[:,1]
time_input_currents_LM = waveform_lm[:,0]
input_currents_LM = waveform_lm[:,1]
time_gates = np.array(system_aem['time_gates'])
GateTimeShift=system_aem['lm']['GateTimeShift']
MeaTimeDelay=system_aem['lm']['MeaTimeDelay']
NoGates=int(system_aem['lm']['NoGates'])
t0_lm = waveform_lm[:,0].max()
times_lm = (time_gates[:NoGates,0] + GateTimeShift + MeaTimeDelay)[i_start_lm:i_end_lm]
GateTimeShift=system_aem['hm']['GateTimeShift']
MeaTimeDelay=system_aem['hm']['MeaTimeDelay']
NoGates=int(system_aem['hm']['NoGates'])
t0_hm = waveform_hm[:,0].max()
times_hm = (time_gates[:NoGates,0] + GateTimeShift + MeaTimeDelay)[i_start_hm:i_end_hm]
receiver_location = np.array([13.25, 0., 30.])
receiver_orientation = "z" # "x", "y" or "z"
times = np.logspace(-5, -2, 31) # time channels
source_location = np.array([0., 0., 30.])
source_radius = 10.
current_amplitude = 1.
source_orientation = 'z'
source_list = []
waveform_lm = tdem.sources.PiecewiseLinearWaveform(times=time_input_currents_LM, currents=input_currents_LM)
waveform_hm = tdem.sources.PiecewiseLinearWaveform(times=time_input_currents_HM, currents=input_currents_HM)
# Define receivers at each location.
dbzdt_receiver_lm = tdem.receivers.PointMagneticFluxTimeDerivative(
receiver_location, times_lm, receiver_orientation,
)
dbzdt_receiver_hm = tdem.receivers.PointMagneticFluxTimeDerivative(
receiver_location, times_hm, receiver_orientation,
)
# Must define the transmitter properties and associated receivers
src_0 = tdem.sources.MagDipole(
[dbzdt_receiver_lm],
location=source_location,
waveform=waveform_lm,
orientation=source_orientation,
)
src_1 = tdem.sources.MagDipole(
[dbzdt_receiver_hm],
location=source_location,
waveform=waveform_hm,
orientation=source_orientation,
)
# this needs to be fixed later
source_list.append(src_0)
source_list.append(src_1)
survey = tdem.Survey(source_list)
wire_map = maps.Wires(("sigma", self.n_layer), ("thickness", self.n_layer-1))
self._simulation_aem = tdem.Simulation1DLayered(
survey=survey, thicknessesMap=wire_map.thickness, sigmaMap=wire_map.sigma,
)
# tTEM
with open(fname_ttem, 'r') as infile:
system_ttem = json.load(infile)
area = system_ttem['tx_area']
i_start_hm = int(system_ttem['hm']['RemoveInitialGates'])
i_start_lm = int(system_ttem['lm']['RemoveInitialGates'])
i_end_lm = int(system_ttem['lm']['RemoveGatesFrom'])
waveform_hm = np.array(system_ttem['hm']['waveform'])
waveform_lm = np.array(system_ttem['lm']['waveform'])
time_input_currents_hm = waveform_hm[:,0] - waveform_hm[:,0].max()
input_currents_hm = waveform_hm[:,1]
time_input_currents_lm = waveform_lm[:,0] - waveform_lm[:,0].max()
input_currents_lm = waveform_lm[:,1]
time_gates = np.array(system_ttem['time_gates'])
GateTimeShift=system_ttem['lm']['GateTimeShift']
MeaTimeDelay=system_ttem['lm']['MeaTimeDelay']
NoGates=int(system_ttem['lm']['NoGates'])
t0_lm = waveform_lm[:,0].max()
times_lm = (time_gates[:NoGates,0] + GateTimeShift + MeaTimeDelay)[i_start_lm:i_end_lm]
GateTimeShift=system_ttem['hm']['GateTimeShift']
MeaTimeDelay=system_ttem['hm']['MeaTimeDelay']
NoGates=int(system_ttem['hm']['NoGates'])
t0_hm = waveform_hm[:,0].max()
times_hm = (time_gates[:NoGates,0] + GateTimeShift + MeaTimeDelay)[i_start_hm:]
receiver_location = np.array([9.28, 0., 0.45])
receiver_orientation = "z" # "x", "y" or "z"
source_location = np.array([0., 0., 0.5])
source_orientation = 'z'
source_list = []
waveform_lm = tdem.sources.PiecewiseLinearWaveform(times=time_input_currents_lm, currents=input_currents_lm)
waveform_hm = tdem.sources.PiecewiseLinearWaveform(times=time_input_currents_hm, currents=input_currents_hm)
# Define receivers at each location.
dbzdt_receiver_lm = tdem.receivers.PointMagneticFluxTimeDerivative(
receiver_location, times_lm, receiver_orientation,
)
dbzdt_receiver_hm = tdem.receivers.PointMagneticFluxTimeDerivative(
receiver_location, times_hm, receiver_orientation,
)
# Must define the transmitter properties and associated receivers
src_0 = tdem.sources.MagDipole(
[dbzdt_receiver_lm],
location=source_location,
waveform=waveform_lm,
orientation=source_orientation,
)
src_1 = tdem.sources.MagDipole(
[dbzdt_receiver_hm],
location=source_location,
waveform=waveform_hm,
orientation=source_orientation,
)
# this needs to be fixed later
source_list.append(src_0)
source_list.append(src_1)
survey_ttem = tdem.Survey(source_list)
self._simulation_ttem = tdem.Simulation1DLayered(
survey=survey_ttem, thicknessesMap=wire_map.thickness, sigmaMap=wire_map.sigma,
)
thicknesses_aem_inv = get_vertical_discretization(30, 3, 1.1)
self._simulation_aem_inv = tdem.Simulation1DLayered(
survey=survey, thicknesses=thicknesses_aem_inv, sigmaMap=maps.ExpMap()
)
thicknesses_ttem_inv = get_vertical_discretization(30, 1, 1.07)
self._simulation_ttem_inv = tdem.Simulation1DLayered(
survey=survey_ttem, thicknesses=thicknesses_ttem_inv, sigmaMap=maps.ExpMap()
)
@property
def times_lm(self):
src = self._simulation.survey.source_list[0]
return src.receiver_list[0].times
@property
def times_hm(self):
src = self._simulation.survey.source_list[1]
return src.receiver_list[0].times
def simulate(self, em_system, z, h, rho_background, rho_layer):
thicknesses = np.array([z, h])
rho = np.array([rho_background, rho_layer, rho_background])
sigma = 1./rho
model = np.r_[sigma, thicknesses]
if em_system == 'AEM':
self._simulation = self._simulation_aem
elif em_system == 'tTEM':
self._simulation = self._simulation_ttem
dpred = self._simulation.dpred(model)
return dpred
def invert(self, em_system, dobs, relative_error, noise_floor, rho_0):
if em_system == 'AEM':
self._simulation_inv = self._simulation_aem_inv
elif em_system == 'tTEM':
self._simulation_inv = self._simulation_ttem_inv
uncertainties = relative_error * np.abs(dobs) * np.ones(len(dobs))
data_object = data.Data(self._simulation_inv.survey, dobs=dobs, standard_deviation=uncertainties)
hz_inv = np.r_[self._simulation_inv.thicknesses, self._simulation_inv.thicknesses[-1]]
mesh = TensorMesh([(hz_inv)], "0")
starting_model = np.log(1./rho_0 * np.ones(mesh.nC))
dmis = data_misfit.L2DataMisfit(simulation=self._simulation_inv, data=data_object)
reg_map = maps.IdentityMap(nP=mesh.nC)
reg = regularization.Sparse(mesh, mapping=reg_map, alpha_s=1e-5, alpha_x=1.0)
opt = optimization.ProjectedGNCG(maxIter=10, maxIterCG=50, tolCG=1e-5)
opt.printers = opt.printers[:1]
inv_prob = inverse_problem.BaseInvProblem(dmis, reg, opt)
starting_beta = directives.BetaEstimate_ByEig(beta0_ratio=1e0)
cooling_beta = directives.BetaSchedule(coolingFactor=2, coolingRate=1)
target = directives.TargetMisfit(chifact=1.2)
# The directives are defined as a list.
directives_list = [
starting_beta,
cooling_beta,
target,
]
inv = inversion.BaseInversion(inv_prob, directives_list)
recovered_model = inv.run(starting_model)
recovered_resistivity = 1./np.exp(recovered_model)
return recovered_resistivity, inv_prob.dpred
em_sim = EMSIMULATION()
from traitlets import HasTraits, Int, Unicode, List, Float, Bool, TraitType, Dict
def get_cmap_range_value(vmin, vmax, vrange):
return [vmin + 0.0*vrange, vmax]
res_min, res_max = 1, 1000
res_range = res_max-res_min
depth_min, depth_max = 0, 300
depth_range = depth_max-depth_min
state = AppState()
np.random.seed(1)
w_em_system=widgets.RadioButtons(options=["AEM", "tTEM"], value='AEM', layout={"width":"200px"})
w_sim_inv=widgets.RadioButtons(options=["Sim", "Inv"], value='Sim', layout={"width":"200px"})
state.register_stateful_widget(w_sim_inv, "sim_inv", Unicode(w_sim_inv.value))
state.register_stateful_widget(w_em_system, "em_system", Unicode(w_em_system.value))
# w_run_inv=widgets.Checkbox(value=False, description='Run inversion', layout={"width":"200px", "padding":"0px"})
# state.register_stateful_widget(w_run_inv, "run_inv", Bool(w_run_inv.value))
w_rho_range=widgets.IntRangeSlider(
min=res_min, max=res_max,
value=(3, 300),
step=1,
continuous_update=False,
disabled=False
)
state.register_stateful_widget(w_rho_range, "rho_range", List(get_cmap_range_value(res_min, res_max, res_range)))
w_depth_range=widgets.IntRangeSlider(
min=depth_min, max=depth_max,
value=(0, 200),
step=1,
continuous_update=False,
disabled=False
)
state.register_stateful_widget(w_depth_range, "depth_range", List(get_cmap_range_value(depth_min, depth_max, depth_range)),)
w_z_1=widgets.FloatText(min=1, max=300, step=1, value=20, description='$z$', continuous_update=False, layout={"width":"300px"})
state.register_stateful_widget(w_z_1, "z_1", Float(w_z_1.value))
w_h_1=widgets.FloatText(min=1, max=300, step=1, value=20, description='$h$', continuous_update=False, layout={"width":"300px"})
state.register_stateful_widget(w_h_1, "h_1", Float(w_h_1.value))
w_rho_background_1=widgets.FloatText(min=1, max=1000, step=1, value=20, description='$\\rho_b$', continuous_update=False, layout={"width":"300px"})
state.register_stateful_widget(w_rho_background_1, "rho_background_1", Float(w_rho_background_1.value))
w_rho_layer_1=widgets.FloatText(min=1, max=1000, step=1, value=50, description='$\\rho$', continuous_update=False, layout={"width":"300px"})
state.register_stateful_widget(w_rho_layer_1, "rho_layer_1", Float(w_rho_layer_1.value))
# w_rho_0=widgets.FloatText(min=1, max=1000, step=1, value=50, description='$\\rho_0$', continuous_update=False, layout={"width":"300px"})
# state.register_stateful_widget(w_rho_0, "rho_0", Float(w_rho_0.value))
w_relative_error=widgets.FloatText(min=1, max=100, step=1, value=3, description='% error', continuous_update=False, layout={"width":"300px"})
state.register_stateful_widget(w_relative_error, "relative_error", Float(w_relative_error.value))
# w_noise_floor=widgets.FloatText(value=1e-15, description='floor', continuous_update=False, layout={"width":"300px"})
# state.register_stateful_widget(w_noise_floor, "noise_floor", Float(w_noise_floor.value))
def apply_change(change):
if change["new"]=='Inv':
# w_rho_0.disabled = True
# w_noise_floor.disabled = True
w_relative_error.disabled = True
w_depth_range.disabled = True
w_rho_range.disabled = True
w_em_system.disabled = True
else:
# w_rho_0.disabled = False
# w_noise_floor.disabled = False
w_relative_error.disabled = False
w_depth_range.disabled = False
w_rho_range.disabled = False
w_em_system.disabled = False
state.widgets["sim_inv"].observe(apply_change, names=['value'])
def apply_change_system(change):
if change["new"]=='AEM':
w_depth_range.value = (0, 300)
# w_noise_floor.value = 1e-15
else:
w_depth_range.value = (0, 70)
# w_noise_floor.value = 1e-11
state.widgets["em_system"].observe(apply_change_system, names=['value'])
def plot(**kwargs):
rho_layer_1 = kwargs['rho_layer_1']
rho_background_1 = kwargs['rho_background_1']
z_1 = kwargs['z_1']
h_1 = kwargs['h_1']
em_system = kwargs['em_system']
rho_range = kwargs['rho_range']
depth_range = kwargs['depth_range']
sim_inv = kwargs['sim_inv']
relative_error = kwargs['relative_error']
# noise_floor = kwargs['noise_floor']
# run_inv = kwargs['run_inv']
thickness_1 = np.r_[z_1, h_1]
rho = np.r_[rho_background_1, rho_layer_1, rho_background_1]
rho_0 = np.r_[rho_background_1, rho_background_1, rho_background_1]
fig, axs = plt.subplots(1,2, figsize=(10, 5))
ax1, ax2 = axs
rng = np.random.default_rng(42)
if sim_inv == 'Sim':
dpred = em_sim.simulate(em_system, z_1, h_1, rho_background_1, rho_layer_1)
dpred_0 = em_sim.simulate(em_system, z_1, h_1, rho_background_1, rho_background_1)
difference = (dpred-dpred_0) / dpred_0 * 100
detectability = np.sqrt((difference**2).sum()/len(difference))
em_sim.dobs = dpred + relative_error*0.01*abs(dpred)*rng.standard_normal(len(dpred))
# + rng.standard_normal(len(dpred)) * noise_floor
em_sim.dpred_0 = dpred_0
em_sim.rho_inv = None
em_sim.dpred_inv = None
em_sim.uncert = relative_error*0.01*abs(em_sim.dobs)
# + noise_floor
# em_sim.rmse = np.sqrt(np.linalg.norm((dpred_0-em_sim.dobs) / em_sim.uncert)**2 / len(em_sim.dobs))
utils.plot_1d_layer_model(thickness_1, rho, ax=ax1, **{'color':'C0', 'label': 'True'})
if sim_inv == 'Sim':
utils.plot_1d_layer_model(thickness_1, rho_0, ax=ax1, **{'color':'C0', 'label': 'Background', 'linestyle':'--'})
ax2.loglog(em_sim.times_lm * 1e3, -em_sim.dobs[:em_sim.times_lm.size], color='C0', label='LM-Obs.')
ax2.loglog(em_sim.times_hm * 1e3, -em_sim.dobs[em_sim.times_lm.size:], color='C1', label='HM-Obs.')
if sim_inv == 'Sim':
ax2.loglog(em_sim.times_lm * 1e3, -em_sim.dpred_0[:em_sim.times_lm.size], color='C0', label='LM-Ini.', linestyle='--')
ax2.loglog(em_sim.times_hm * 1e3, -em_sim.dpred_0[em_sim.times_lm.size:], color='C1', label='HM-Ini.', linestyle='--')
if sim_inv == 'Inv':
rho_inv, dpred_inv = em_sim.invert(em_system, em_sim.dobs, relative_error*0.01, 0., rho_background_1);
em_sim.rho_inv = rho_inv
em_sim.dpred_inv = dpred_inv
em_sim.rmse = np.sqrt(np.linalg.norm((dpred_inv-em_sim.dobs) / em_sim.uncert)**2 / len(em_sim.dobs))
ax2.loglog(em_sim.times_lm * 1e3, -em_sim.dpred_inv[:em_sim.times_lm.size], color='C0', label='LM-Pred.', marker='o', markerfacecolor='None', linestyle='None')
ax2.loglog(em_sim.times_hm * 1e3, -em_sim.dpred_inv[em_sim.times_lm.size:], color='C1', label='HM-Pred.', marker='o', markerfacecolor='None', linestyle='None')
utils.plot_1d_layer_model(em_sim._simulation_inv.thicknesses, em_sim.rho_inv, ax=ax1, **{'color':'r', 'label': 'Pred.'})
ax2.set_ylabel("Voltage (V/A-m$^4$)")
if sim_inv == 'Sim':
ax2.set_title("EM data (Detectability={:.1f}%)".format(detectability))
elif sim_inv == 'Inv':
ax2.set_title("EM data (RMSE={:.1e})".format(em_sim.rmse))
ax2.set_xlabel("Time (ms)")
ax1.set_xlabel("Resistivity (Ohm-m)")
ax2.grid(which='both', alpha=0.2)
ax1.set_title("Vertical resistivity profile")
ax1.set_xlim(rho_range[0], rho_range[1])
ax1.set_ylim(depth_range[1], depth_range[0])
ax1.legend(fontsize=10)
ax2.legend(fontsize=10)
plt.tight_layout()
plt.show()
return
# Some texts
top_label_1 = Label(value="EM system:", layout={"width":"100px"})
top_right_label_1 = Label(value="Simulation or Inversion:", layout={"width":"150px"})
top_label_2 = Label(value="Resistivity range", layout={"width":"100px"})
top_label_3 = Label(value="Depth range", layout={"width":"100px"})
edit_plot_limit_label = Label(value="Edit plot access limit:", layout={"width":"150px", "padding":"0px"})
top_panel_1 = HBox([top_label_1, w_em_system, top_right_label_1, w_sim_inv], layout={"align_items":"center"})
top_panel_2 = HBox([top_label_2, w_rho_range], layout={"align_items":"center"})
top_panel_3 = HBox([top_label_3, w_depth_range], layout={"align_items":"center"})
# top_panel_4 = HBox([run_simulation_label, w_run_simulation])
left_label = Label(value="Parameters:")
left_panel = VBox(
[w_z_1, w_h_1, w_rho_background_1, w_rho_layer_1, w_relative_error], layout={"align_items":"center"}
)
# right_label = Label(value="Inversion parameters:")
# right_panel = VBox(
# [right_label, w_relative_error, w_run_inv], layout={"align_items":"center"}
# )
w_output = interactive_output(
plot,
{ k: state.widgets[k] for k in state.widgets.keys()}
);
bottom_panel = HBox([left_panel], layout={"align_items":"center"})
# edit_panel = HBox([edit_plot_limit_label, w_edit_plot], layout={"align_items":"center"})
panel = VBox([top_panel_1, bottom_panel, w_output, edit_plot_limit_label, top_panel_2, top_panel_3])
display(panel)
Loading...
# import numpy as np
# import matplotlib.pyplot as plt
# from ipywidgets import Button, interactive_output, HBox, VBox
# from ipywidgets.widgets import FloatSlider
# def plot_sine_wave(freq, phase, amplitude):
# x = np.linspace(0, 2*np.pi, 1000)
# y = amplitude * np.sin(freq*x + phase)
# plt.plot(x, y)
# plt.show()
# freq_slider = FloatSlider(value=1.0, min=0.1, max=10.0, step=0.1, description='Frequency:')
# phase_slider = FloatSlider(value=0.0, min=0.0, max=2*np.pi, step=0.1, description='Phase:')
# amplitude_slider = FloatSlider(value=1.0, min=0.1, max=10.0, step=0.1, description='Amplitude:')
# plot_button = Button(description='Plot')
# output = interactive_output(plot_sine_wave, {'freq': freq_slider, 'phase': phase_slider, 'amplitude': amplitude_slider})
# def on_button_clicked(b):
# with output:
# freq = freq_slider.value
# phase = phase_slider.value
# amplitude = amplitude_slider.value
# plot_sine_wave(freq, phase, amplitude)
# plot_button.on_click(on_button_clicked)
# inputs = HBox([freq_slider, phase_slider, amplitude_slider, plot_button])
# display(VBox([inputs, output]))