"""analysis module
This module provides functions to analyze ghosts spots on the full focal plane, like getting ghosts positions
and features, computing separations between ghosts spots, associating ghosts spots and computing distances
between to sets of ghosts spots.
"""
import concurrent.futures
import pandas as pd
import numpy as np
import math
import matplotlib.pyplot as plt
from ghosts.tools import get_ranges
from ghosts.beam import get_n_phot_for_power_nw_wl_nm
from ghosts.constants import LSST_CAMERA_PIXEL_DENSITY_MM2, LSST_CAMERA_PIXEL_QE
[docs]def get_full_light_on_camera(r_forward):
""" Convert list of forward batoid ray vectors to list of impact points on camera
Parameters
----------
r_forward : `list` of `batoid.RayVector`
a list of batoid RayVector with a bunch of rays propagated through the system.
Returns
-------
all_x : `list`
list of x coordinates of all light rays impact points on the detector
all_y : `list`
list of y coordinates of all light rays impact points on the detector
all_f : `list`
list of fluxes coordinates of all light rays impact points on the detector
"""
# Plot light on detector on the right
all_x = r_forward[0].x.tolist()
all_y = r_forward[0].y.tolist()
all_f = r_forward[0].flux.tolist()
for rr in r_forward[1:]:
all_x = all_x + rr.x.tolist()
all_y = all_y + rr.y.tolist()
all_f = all_f + rr.flux.tolist()
return all_x, all_y, all_f
[docs]def get_ghost_name(ghost, debug=False):
""" Just the name of a ghost spot as a tuple containing the
names of the surface on which the first and second light reflection occured.
Parameters
----------
ghost : `batoid.RayVector`
a batoid RayVector with a bunch of rays propagated through the system.
debug : `bool`
debug mode or not
Returns
-------
ghost_name_tuple : `tuple` of `strings`
name of the ghost as a tuple of strings, e.g. ('Filter_entrance', 'L2_exit') or ('L2_entrance', 'L1_entrance')
"""
ghost_name_tuple = ('main', 'main')
for i, opt in enumerate(ghost.path):
if debug:
print(ghost.path[i - 2], opt)
if i >= 2 and ghost.path[i - 2] == opt:
if debug:
print(f'{i} bounce on {ghost.path[i - 1]} between {ghost.path[i - 2]} and {opt}')
if ghost_name_tuple == ('main', 'main'):
ghost_name_tuple = (ghost.path[i - 1],)
else:
ghost_name_tuple = (ghost_name_tuple[0], ghost.path[i - 1])
return ghost_name_tuple
[docs]def get_ghost_stats(ghost):
""" Compute some basic stats for a simulated ghost spot image
.. todo::
`get_ghost_stats` is only working on simulated images for now
Parameters
----------
ghost : `batoid.RayVector`
A batoid RayVector with a bunch of rays propagated through the system.
Returns
-------
mean_x, std_x, mean_y, std_y : `floats`
spot position with uncertainty as standard deviation in x and y
x_width, y_width : `floats`
spot width in x and y
radius, radius_err : `float`
beam spot radius from x and y widths, and an estimator of the uncertainty
weights_sum : `float`
total flux as computed by batoid `flux.sum()`
mean_intensity : `float`
average flux of all rays
spot_surface_mm2 : `float`
spot surface, using `x_width` as the diameter
density_phot_mm2 : `float`
density of photons per :math:`mm^2`, for one simulated photon, as the `mean_intensity`/`spot_surface_mm2`
"""
mean_x = ghost.x.mean()
std_x = ghost.x.std()
mean_y = ghost.y.mean()
std_y = ghost.x.std()
x_width = ghost.x.max() - ghost.x.min()
y_width = ghost.y.max() - ghost.y.min()
radius = (x_width/2. + y_width/2.) / 2. # simple mean of the radius
radius_err = math.fabs(x_width - y_width) / 2.
weights_sum = ghost.flux.sum()
mean_intensity = weights_sum / len(ghost.x)
spot_surface_mm2 = math.pi * (radius * 1000.) * (radius * 1000.)
density_phot_mm2 = mean_intensity / spot_surface_mm2
return mean_x, std_x, mean_y, std_y, x_width, y_width, radius, radius_err, \
weights_sum, mean_intensity, spot_surface_mm2, density_phot_mm2
[docs]def get_ghost_spot_data(i, ghost, p=100, wl=600):
""" Get some basic information for a simulated ghost spot image
.. todo::
`get_ghost_spot_data` should be made wavelength dependent
Parameters
----------
i : `int`
the ghost index, useful really only on simulated data
ghost : `batoid.RayVector`
a batoid RayVector with a bunch of rays propagated through the system.
p : `float`
beam power in nW to compute the photon density
wl : `int`
the beam wavelength in nm
Returns
-------
ghost_spot_data : `dict`
a dictionary with ghost spot information : index, name, pos_x, width_x, pos_x, width_y, surface,
n_pixels, pixel_signal and photon_density
"""
# identify ghost
ghost_name = get_ghost_name(ghost)
# normalized stats
mean_x, std_x, mean_y, std_y, x_width, y_width, radius, radius_err, \
weights_sum, mean_intensity, spot_surface_mm2, density_phot_mm2 = get_ghost_stats(ghost)
# number of pixels
n_pixels = round(spot_surface_mm2 * LSST_CAMERA_PIXEL_DENSITY_MM2)
# number of photons for 100 nW at 600 nm
n_phot_total = get_n_phot_for_power_nw_wl_nm(p, wl)
n_e_pixel = density_phot_mm2 / LSST_CAMERA_PIXEL_DENSITY_MM2 * n_phot_total * LSST_CAMERA_PIXEL_QE
ghost_spot_data = {'index': i, 'name': ghost_name,
'pos_x': mean_x, 'std_x': std_x, 'width_x': x_width,
'pos_y': mean_y, 'std_y': std_y, 'width_y': y_width,
'radius': radius, 'radius_err': radius_err,
'flux': weights_sum,
'surface': spot_surface_mm2, 'n_pixels': n_pixels,
'pixel_signal': n_e_pixel, 'photon_density': density_phot_mm2}
return ghost_spot_data
[docs]def map_ghost(ghost, ax, n_bins=100, dr=0.01):
""" Builds a binned image, as a `matplotlib.hexbin` of a ghost
Parameters
----------
ghost : `batoid.RayVector`
a batoid RayVector with a bunch of rays propagated through the system.
ax : `matplotlib.axis`
an axis object to draw the histogram
n_bins : `int`
the number of bins of the histogram, that is also the number of "pixel" of the "image"
dr : `float`
the extra space around the ghost spot image, to get nice axis ranges
Returns
-------
ghost_map : `matplotlib.axis.hexbin`
a binned image (2D histogram) of the ghost on the detector plane
"""
# bin data
ghost_map = ax.hexbin(ghost.x, ghost.y, C=ghost.flux, reduce_C_function=np.sum,
gridsize=n_bins, extent=get_ranges(ghost.x, ghost.y, dr))
return ghost_map
[docs]def reduce_ghosts(r_forward):
""" Builds a binned image, as a `matplotlib.hexbin` of a ghost
Parameters
----------
r_forward : `batoid.RayVector`
a batoid RayVector with a bunch of rays propagated through the system.
Returns
-------
spots_data : `list` of `dict`
a list of dictionaries of ghost spot data (position, radius, brightness)
ghost_maps : `list` of `matplotlib.axis.hexbin`
a list of images of ghosts as 2D histograms
"""
# store some stats roughly
spots_data = []
ghost_maps = []
_fig, ax = plt.subplots(len(r_forward))
axs = ax.ravel()
for i, ghost in enumerate(r_forward):
# bin data (and make plot)
hb_map = map_ghost(ghost, axs[i])
ghost_spot_data = get_ghost_spot_data(i, ghost)
ghost_maps.append(hb_map)
spots_data.append(ghost_spot_data)
plt.close(_fig)
return spots_data, ghost_maps
[docs]def make_data_frame(spots_data, beam_id=0, geom_id=0):
""" Create a pandas data frame from the ghost spots data dictionary
and a beam configuration.
Parameters
----------
spots_data : `dict`
a dictionary with ghost spots data
beam_id : `int`
a beam configuration id
geom_id : `int`
a geometry configuration id
Returns
-------
data_frame : `pandas.DataFrame`
a panda data frame with ghost spot data information, including beam and geometry configuration ids
"""
# creating a nice pandas data frame
data_frame = pd.DataFrame(
{
"beam_id": beam_id,
"geom_id": geom_id,
"index": np.array([data['index'] for data in spots_data], dtype="int"),
"name": [data['name'] for data in spots_data],
"pos_x": np.array([data['pos_x'] for data in spots_data], dtype="float"),
"std_x": np.array([data['std_x'] for data in spots_data], dtype="float"),
"pos_y": np.array([data['pos_y'] for data in spots_data], dtype="float"),
"std_y": np.array([data['std_y'] for data in spots_data], dtype="float"),
"width_x": np.array([data['width_x'] for data in spots_data], dtype="float"),
"width_y": np.array([data['width_y'] for data in spots_data], dtype="float"),
"radius": np.array([data['radius'] for data in spots_data], dtype="float"),
"radius_err": np.array([data['radius_err'] for data in spots_data], dtype="float"),
"flux": np.array([data['flux'] for data in spots_data], dtype="float"),
"surface": np.array([data['surface'] for data in spots_data], dtype="float"),
"n_pixels": np.array([data['n_pixels'] for data in spots_data], dtype="int"),
"pixel_signal": np.array([data['pixel_signal'] for data in spots_data], dtype="float"),
}
)
return data_frame
[docs]def compute_ghost_separations(data_frame):
""" Compute ghosts images separations and various ratios from a ghosts spot data frame
.. todo::
`compute_ghost_separations` remove main image from the distance since it's saturated in data.
Parameters
----------
data_frame : `pandas.DataFrame`
a ghost spots data frame
Returns
-------
data_frame : `pandas.DataFrame`
a pandas data frame with information on ghost spots data separations and ratios
"""
# computing distances ghost to ghost, and ghosts overlap
dist_data = []
n = len(data_frame['pos_x']) - 1
for i in range(n):
for k in range(1, n - i):
# distance center to center
distance = math.dist([data_frame['pos_x'][i], data_frame['pos_y'][i]],
[data_frame['pos_x'][i + k], data_frame['pos_y'][i + k]])
# distance border to border, assuming round spot - overlap
r1 = data_frame['radius'][i]
r2 = data_frame['radius'][i + k]
overlap = distance - (r1 + r2)
# surface and pixel signal ratio
flux_ratio = data_frame['flux'][i + k] / data_frame['flux'][i]
surface_ratio = data_frame['surface'][i + k] / data_frame['surface'][i]
signal_ratio = data_frame['pixel_signal'][i + k] / data_frame['pixel_signal'][i]
# ghost names
name_1 = data_frame['name'][i]
name_2 = data_frame['name'][i + k]
# add data container
dist_data.append([i, i + k, name_1, name_2, distance, overlap, flux_ratio, surface_ratio, signal_ratio])
ghosts_separation = pd.DataFrame(
{
"ghost_1": np.array([data[0] for data in dist_data], dtype="int"),
"ghost_2": np.array([data[1] for data in dist_data], dtype="int"),
"name_1": [data[2] for data in dist_data],
"name_2": [data[3] for data in dist_data],
"distance": np.array([data[4] for data in dist_data], dtype="float"),
"overlap": np.array([data[5] for data in dist_data], dtype="float"),
"flux_ratio": np.array([data[6] for data in dist_data], dtype="float"),
"surface_ratio": np.array([data[7] for data in dist_data], dtype="float"),
"signal_ratio": np.array([data[8] for data in dist_data], dtype="float"),
}
)
return ghosts_separation
[docs]def compute_distance_spot_to_spot(df_slice_1, df_slice_2, radius_scale_factor=100):
""" Compute a 3D geometric distance between 2 ghosts spots centers, considering the spot radius
as the 3rd dimension.
Parameters
----------
df_slice_1 : `pandas.DataFrame`
a ghost spots data frame slice, with one line corresponding to one ghost
df_slice_2 : `pandas.DataFrame`
a ghost spots data frame slice, with one line corresponding to one ghost
radius_scale_factor : `float`
as the radius is considered a 3rd dimension, we scale it to the same range as the x and y axis, e.g. the spot
radius is 2.5 mm to scale to the 60 cm of the focal plane ~ 100
Returns
-------
dist_2d : `float`
the distance between 2 spots for the 2D distance
dist_2d_err : `float`
the error on that distance from the std error on the position centers and radius for the 2D distance
dist_3d : `float`
the distance between 2 spots for the 3D distance
dist_3d_err : `float`
the error on that distance from the std error on the position centers and radius for the 3D distance
"""
dist_2d = math.dist([df_slice_1['pos_x'], df_slice_1['pos_y']],
[df_slice_2['pos_x'], df_slice_2['pos_y']])
d1_2d_sq = df_slice_1['std_x'] * df_slice_1['std_x'] + df_slice_1['std_y'] * df_slice_1['std_y']
d2_2d_sq = df_slice_2['std_x'] * df_slice_2['std_x'] + df_slice_2['std_y'] * df_slice_2['std_y']
dist_2d_err = math.sqrt(d1_2d_sq + d2_2d_sq)
dist_3d = math.dist([df_slice_1['pos_x'], df_slice_1['pos_y'], df_slice_1['radius'] * radius_scale_factor],
[df_slice_2['pos_x'], df_slice_2['pos_y'], df_slice_2['radius'] * radius_scale_factor])
d1_3d_sq = df_slice_1['std_x'] * df_slice_1['std_x'] + df_slice_1['std_y'] * df_slice_1['std_y'] \
+ df_slice_1['radius_err'] * df_slice_1['radius_err'] * radius_scale_factor * radius_scale_factor
d2_3d_sq = df_slice_2['std_x'] * df_slice_2['std_x'] + df_slice_2['std_y'] * df_slice_2['std_y'] \
+ df_slice_2['radius_err'] * df_slice_2['radius_err'] * radius_scale_factor * radius_scale_factor
dist_3d_err = math.sqrt(d1_3d_sq + d2_3d_sq)
return dist_2d, dist_2d_err, dist_3d, dist_3d_err
[docs]def find_nearest_ghost(ghost_slice, ghosts_df, radius_scale_factor=100):
""" Find the nearest ghost spot to a given ghost spot and report its distance with its error
This is done using both the 2D and 3D distances.
Parameters
----------
ghost_slice : `pandas.DataFrame`
a ghost spots data frame slice, with one line corresponding to one ghost
ghosts_df : `pandas.DataFrame`
a `pandas` data frame with information on ghost spots data separations and ratios
radius_scale_factor : `float`
a kind of weight for the spot radius to be used in the distance computation
Returns
-------
index_of_min_2d : `int`
the index in the data frame of the nearest ghost for the 2D distance
min_distance_2d : `float`
distance of the given ghost spot to the nearest ghost spot for the 2D distance
min_distance_2d_err : `float`
the uncertainty on the distance with the nearest ghost spot for the 2D distance
index_of_min_3d : `int`
the index in the data frame of the nearest ghost for the 3D distance
min_distance_3d : `float`
distance of the given ghost spot to the nearest ghost spot for the 3D distance
min_distance_3d_err : `float`
the uncertainty on the distance with the nearest ghost spot for the 3D distance
"""
dist_2d_data = []
dist_2d_err_data = []
dist_3d_data = []
dist_3d_err_data = []
n = len(ghosts_df['pos_x'])
for i in range(n):
dist_2d, dist_2d_err, dist_3d, dist_3d_err = \
compute_distance_spot_to_spot(ghost_slice, ghosts_df.xs(i), radius_scale_factor)
dist_2d_data.append(dist_2d)
dist_2d_err_data.append(dist_2d_err)
dist_3d_data.append(dist_3d)
dist_3d_err_data.append(dist_3d_err)
dist_2d_array = np.array(dist_2d_data)
index_of_min_2d = np.argmin(dist_2d_array)
min_distance_2d = dist_2d_array[index_of_min_2d]
min_distance_2d_err = dist_2d_err_data[index_of_min_2d]
dist_3d_array = np.array(dist_3d_data)
index_of_min_3d = np.argmin(dist_3d_array)
min_distance_3d = dist_3d_array[index_of_min_3d]
min_distance_3d_err = dist_3d_err_data[index_of_min_3d]
return index_of_min_2d, min_distance_2d, min_distance_2d_err, \
index_of_min_3d, min_distance_3d, min_distance_3d_err
[docs]def match_ghosts(ghosts_df_1, ghosts_df_2, radius_scale_factor=100):
""" Match ghosts positions from two ghosts data frames
Parameters
----------
ghosts_df_1 : `pandas.DataFrame`
a `pandas` data frame with information on ghost spots data separations and ratios
ghosts_df_2 : `pandas.DataFrame`
a `pandas` data frame with information on ghost spots data separations and ratios
radius_scale_factor : `float`
a kind of weight for the spot radius to be used in the distance computation
Returns
-------
ghosts_match : `pandas.DataFrame`
a `pandas` data frame with the indices of each ghost and nearest ghost, and the distance between the two
"""
match_i1 = []
match_i2_2d = []
match_i2_3d = []
match_min_dist_2d = []
match_min_dist_3d = []
match_min_dist_2d_err = []
match_min_dist_3d_err = []
n = len(ghosts_df_1['pos_x'])
for i in range(n):
index_of_min_2d, min_distance_2d, min_distance_2d_err, \
index_of_min_3d, min_distance_3d, min_distance_3d_err = \
find_nearest_ghost(ghosts_df_1.xs(i), ghosts_df_2, radius_scale_factor)
match_i1.append(i)
# 2D distance
match_i2_2d.append(index_of_min_2d)
match_min_dist_2d.append(min_distance_2d)
match_min_dist_2d_err.append(min_distance_2d_err)
# 3D distance
match_i2_3d.append(index_of_min_3d)
match_min_dist_3d.append(min_distance_3d)
match_min_dist_3d_err.append(min_distance_3d_err)
ghosts_match = pd.DataFrame(
{
"beam_id_1": ghosts_df_1['beam_id'],
"geom_id_1": ghosts_df_1['geom_id'],
"beam_id_2": ghosts_df_2['beam_id'],
"geom_id_2": ghosts_df_2['geom_id'],
"ghost_1": np.array(match_i1, dtype="int"),
"ghost_2_2d": np.array(match_i2_2d, dtype="int"),
"distance_2d": np.array(match_min_dist_2d, dtype="float"),
"distance_2d_err": np.array(match_min_dist_2d_err, dtype="float"),
"ghost_2_3d": np.array(match_i2_3d, dtype="int"),
"distance_3d": np.array(match_min_dist_3d, dtype="float"),
"distance_3d_err": np.array(match_min_dist_3d_err, dtype="float"),
}
)
return ghosts_match
[docs]def compute_reduced_distance(ghosts_match):
""" Compute a kind of reduced distance between two lists of ghosts
.. math::
L = \\frac{\\sqrt{\\sum_{i=1}^{n} \\frac{d(g_{s,i}, g_{r,k_i})^2}{\\sigma_d(g_{s,i}, g_{r,k_i})^2}}}{n}
Parameters
----------
ghosts_match : `pandas.DataFrame`
a data frame with information about matching ghosts spots, see `match_ghosts`
Returns
-------
reduced_distance : `float`
a reduced distance computed as the average of the square root of the sum of squared input distances divided
by the square of the errors on the distance.
"""
n_matches = len(ghosts_match['distance_3d'])
reduced_distance = math.sqrt(sum(ghosts_match['distance_3d'] * ghosts_match['distance_3d'] /
ghosts_match['distance_3d_err'] * ghosts_match['distance_3d_err'])) / n_matches
return reduced_distance
[docs]def compute_2d_reduced_distance(ghosts_match):
""" Compute a simple 2D reduced distance between two lists of ghosts
Parameters
----------
ghosts_match : `pandas.DataFrame`
a data frame with information about matching ghosts spots, see `match_ghosts`
Returns
-------
reduced_distance : `float`
a reduced distance computed as the average of the square root of the sum of squared input distances divided
"""
n_matches = len(ghosts_match['distance_2d'])
reduced_distance = math.sqrt(sum(ghosts_match['distance_2d'] * ghosts_match['distance_2d'] /
ghosts_match['distance_2d_err'] * ghosts_match['distance_2d_err'])) / n_matches
return reduced_distance
[docs]def match_and_dist_2d(ref_df, fit_df):
""" Match ghosts catalogs and compute 2D distance
Parameters
----------
ref_df : `pandas.DataFrame`
reference ghost catalog
fit_df : `pandas.DataFrame`
ghost catalog from the fit
Returns
-------
dist_2d : `double`
2D distance between the two ghosts catalogs
"""
match = match_ghosts(ref_df, fit_df, radius_scale_factor=10)
dist_2d = compute_2d_reduced_distance(match)
return dist_2d
[docs]def compute_uber_distance_2d(spots_df_list, ordered_fit_df):
""" Compute a simple 2D reduced distance between two lists of ghosts
Parameters
----------
spots_df_list : `list[pandas.DataFrame]`
list of data frame of ghosts spots
ordered_fit_df : `list[pandas.DataFrame]`
list of data frame of ghosts spots
Returns
-------
uber_dist : `float`
Uber distance between 2 list of ghosts matches
"""
# now compute distance
dist_list = []
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(match_and_dist_2d, ref_df, fit_df)
for ref_df, fit_df in zip(spots_df_list, ordered_fit_df)]
for future in concurrent.futures.as_completed(futures):
dist_list.append(future.result())
# compute Uber distance
uber_dist = np.sqrt(np.square(dist_list).sum()) / len(dist_list)
return uber_dist