Matching stars between frames
Lately I've been trying to find algorithms that will match stars between two consecutive CCD images. Needless to say, the task isn't at all easy, and my `naive' implementations have rather costly time complexities. However, with a little tuning, it's possible to get pretty descent results.
To begin, I installed Source Extractor (on Debian and Ubuntu, just type
Now the problem has been reduced to finding common sources between the former and latter lists; in other words, we're interested in finding an injective (but not necessarily surjective) mapping from the former to the latter list.
To organize and read the data in these lists, I've written the following class and function in Python.
To locate possible matches, I've tried the algorithm described by Stetson (1989). After realizing that it gave far too many false-positives (e.g. see Valdes et. al. (1995)), I tried adding a simple RANSAC algorithm implementation, suggested by PixInsight. In Python, my implementation is as follows.
After running this comparison on a sample of the triangles between the two fields and a background tolerance (bias) has been removed (reduces false positives to some extent), the RANSAC algorithm iterates through small samples (only three points are necessary to arrive at a valid affine transformation between the two lists) until a minimum threshold of matches has been achieved.
Given the two star lists above, I've reduced them to this set of files, each containing a list of coordinates (and the corresponding frame number). The optimal transform between the two lists I've arrived at is as follows, which yields 88 matches.
\begin{align}\begin{bmatrix}u\\v\end{bmatrix}&=\begin{bmatrix}0.999764&-0.000169\\0.000494&1.000358\end{bmatrix}\begin{bmatrix}x\\y\end{bmatrix}+\begin{bmatrix}2.600053\\-0.231103\end{bmatrix}.\end{align}I'd note that, because the telescope and camera combination were the same for both images, and they were taken consecutively, it's no wonder there's hardly any rotation or scaling; however, there certainly appears to have been a multiple pixel drift between exposures.
As far as improvements go, this definitely seems like a problem machine learning could solve. I've been working on using this method for sequences of exposures, but it's possible that the threshold is never met using RANSAC (why I implemented the time limit using
Okay! Now that I got that off my chest, I have to go work a double shift :P
To begin, I installed Source Extractor (on Debian and Ubuntu, just type
sudo apt-get install sextractor
). Then I wrote this Python script to generate NN filters. As an example use, try running the following../psf.py --gaussian -i $(echo 3.1 > gaussian.conf && echo gaussian.conf) -s 6 -o gauss.conv && rm gaussian.conf && cat gauss.convIn the terminal you should see the following.
# Gaussian convolution filter for params 3.1 0.013196 0.019866 0.024375 0.024375 0.019866 0.013196 0.019866 0.029908 0.036695 0.036695 0.029908 0.019866 0.024375 0.036695 0.045023 0.045023 0.036695 0.024375 0.024375 0.036695 0.045023 0.045023 0.036695 0.024375 0.019866 0.029908 0.036695 0.036695 0.029908 0.019866 0.013196 0.019866 0.024375 0.024375 0.019866 0.013196Next, after writing up the configuration files and running it on two fields, I've arrived at these two star lists.
Now the problem has been reduced to finding common sources between the former and latter lists; in other words, we're interested in finding an injective (but not necessarily surjective) mapping from the former to the latter list.
To organize and read the data in these lists, I've written the following class and function in Python.
from typing import List, Tuple from astropy.io.ascii import read class Source: """ Basically a container for holding data about a particular source """ def __init__(self, NUMBER: int, X_IMAGE: float, Y_IMAGE: float, MAG_BEST: float, MAGERR_AUTO: float, IMAGE_NUMBER: int) -> None: self.NUMBER = NUMBER # number of source in an image self.X_IMAGE = X_IMAGE # x-coordinate in image self.Y_IMAGE = Y_IMAGE # y-coordinate in image self.MAG_BEST = MAG_BEST # best of MAG_AUTO, MAG_ISOCOR from SE self.MAGERR_AUTO = MAGERR_AUTO # RMS error for AUTO magnitude self.IMAGE_NUMBER = IMAGE_NUMBER # image number it appears in def get_coordinates(self) -> Tuple[float, float]: """ Get the location of an object (helper function) """ loc = (self.X_IMAGE, self.Y_IMAGE) return loc def read_star_tables(fnames: List[str]) -> List[List[Source]]: """ Map a list of file names to a list of lists with source data. Parameters ---------- fnames : List[List[str]] List of list of paths to SExtractor output tables containing source ID, (x,y)-coordinate, magnitude and magnitude error. Returns ------- frames : List[List[Source]] List of list of sources, encoded as Source objects, in each image. """ frames: List[List[Source]] = [] for id, fname in enumerate(fnames, start=1): # use astropy's amazing `read` function to grab SE tabular data data = read(fname).as_array() stars = [] # parse as Source objects for src in data: values = map(np.asscalar, src) star = Source(*values, id) stars.append(star) frames.append(stars) return framesSo the two star tables are now contained in a list of lists of
Source
objects.To locate possible matches, I've tried the algorithm described by Stetson (1989). After realizing that it gave far too many false-positives (e.g. see Valdes et. al. (1995)), I tried adding a simple RANSAC algorithm implementation, suggested by PixInsight. In Python, my implementation is as follows.
from math import sqrt from _exceptions import StarsNotMatchedError, NotEnoughStarsMatchedError from numpy.linalg import inv, norm from random import sample from itertools import combinations from types import FunctionType as Function from warnings import warn import signal import numpy as np import os def _dist(x: Tuple[float, float], y: Tuple[float, float]) -> float: """ Get the Euclidean distance between these two points. """ _x: np.ndarray = np.array(x) _y: np.ndarray = np.array(y) d2 = norm(_x - _y) return d2 class Triangle: """ Hold parameter description of a triangle of stars Parameters ---------- BA : float The ratio of dividing the triangle's intermediate side by its longest. CA : float The ratio of dividing the triangle's shortest side by its longest. src : Source The source that lies opposite the triangle's longest side. *sides : float All the sidelengths, which could be used at some point in :class:`Stetson` for determining if a triangle is too symmetric. """ def __init__(self, BA: float, CA: float, src: Source, *sides: float) -> None: self.BA = BA self.CA = CA self.side_1, self.side_2, self.side_3 = sides self.src = src @classmethod def build(cls, pts: Tuple[Source, Source, Source]) -> 'Triangle': # If we only have the Source's c_1, c_2, c_3 = pts side_1 = _dist(c_1.get_coordinates(), c_2.get_coordinates()) # c_3 side_2 = _dist(c_2.get_coordinates(), c_3.get_coordinates()) # c_1 side_3 = _dist(c_3.get_coordinates(), c_1.get_coordinates()) # c_2 triangle = {side_1: c_3, side_2: c_1, side_3: c_2} A, B, C = sorted(triangle.keys()) maximum_src = triangle[C] BA = B / A CA = C / A t = cls(BA, CA, maximum_src, A, B, C) return t def __sub__(self, other: 'Triangle') -> float: """ compute the distance from self to other in the invariant (B/A, C/A)-space. Returns ------- diff : float The Euclidean distance between this triangle's BA and CA values and another's. """ diff = sqrt((self.BA - other.BA) ** 2 + (self.CA - other.CA) ** 2) return diff class _Transform: """ `Pretty print' the transformation equations between frames to the terminal and perform transformations using the coefficients. Parameters ---------- a11, ..., a23 : float The six optimal affine transform parameters. """ def __init__(self, a11: float, a12: float, a13: float, a21: float, a22: float, a23: float) -> None: self.a11 = a11 self.a12 = a12 self.a13 = a13 self.a21 = a21 self.a22 = a22 self.a23 = a23 def __str__(self) -> str: form = f'u = {self.a11} x1 + {self.a12} y1 + {self.a13}\n' \ f'v = {self.a21} x1 + {self.a22} y1 + {self.a23}' return form def __repr__(self): return self.__str__() def transform(self, c: Tuple[float, float]) -> Tuple[float, float]: """ Transform a coordinate. """ x1, y1 = c u = self.a11 * x1 + self.a12 * y1 + self.a13 v = self.a21 * x1 + self.a22 * y1 + self.a23 new_c = (u, v) return new_c class Correspondence: """ Match sources between CCD images that have been extracted with SExtractor. Parameters ---------- frames : List[List[Source]] A list of lists containing the sources in each image (which have been encoded as :class:`Source` objects via :func:`read_star_tables`). out : str, default 'sources' The directory to write star coordinates to clobber : bool, default True If True, """ def __init__(self, frames: List[List[Source]], out: str ='sources', clobber: bool =True) -> None: self.frames = frames self._num_frames = len(frames) self.out = out # dir we're writing data to # where to write star_*.dat files and store coordinates of each Source if not os.path.exists(out): os.mkdir(out) else: # clear out the directory for file in os.listdir(out): os.remove(out + '/' + file) def match(self, tol: float =1e-3, keep: int =25, cutoff: int =20, tri_tol: float =0.0, dup: bool =False, ransac_required_tol: int =20, ransac_center_tol: float =5, ransac_timeout: int =60) -> List[str]: """ Match stars between the list of frames containing lists of :class:`Source`s. Parameters ---------- tol : float, default 1e-3 Similarity tolerance level while comparing one :class:`Triangle` object with another (with the special method `-`). keep : int, default 25 The number of stars to keep from the current frame and the next while looking for similarities. cutoff : int, default 20 Minimum number of similar triangles for two stars to be deemed the same. tri_tol : float, default (off) 0.0 Side difference tolerance level while determining if a :class:`Triangle` is too symmetric for use. dup : bool, default False Filter duplicate mappings that may occur. ransac_required_tol : int, default 20 stars Number of stars required to match between the two frames with the transformation. The algorithm continues until a timeout otherwise. ransac_center_tol : float, default 5 pixels Circle radius within which stars in the opposite frame must reside under the transformation to be deemed a match. ransac_timeout : int, default 60 seconds Number of seconds after which the RANSAC algorithm is cancelled and current results are used. Returns ------- sources : List[str] A list of unique source file names that contain a list of the source's (x,y) pixel location in each frame Raises ------ StarsNotMatchedError If no stars are matched between lists. NotEnoughStarsMatchedError If less than 3 stars are matched between the two frames. """ sources = [] for frame in range(self._num_frames - 1): ## Stetson's algorithm In_i = len(self.frames[frame]) In_j = len(self.frames[frame + 1]) In_i_cut = self.frames[frame][:keep] In_j_cut = self.frames[frame + 1][:keep] matches = np.zeros((In_i, In_j)) # iterate over all triangles and compare. This may be a slow # implementation because we waste time re-computing side lengths for c_i in combinations(In_i_cut, r=3): t_i = Triangle.build(c_i) i = t_i.src.NUMBER - 1 # check for too much symmetry; we'd like scalene triangles if tri_tol and self._symmetry(t_i, tri_tol): continue for c_j in combinations(In_j_cut, r=3): t_j = Triangle.build(c_j) j = t_j.src.NUMBER - 1 if tri_tol and self._symmetry(t_j, tri_tol): continue if t_i - t_j < tol: # `i` is the star in the former image, `j` is the star # in the latter that we think is the same one matches[i, j] += 1 matches[np.where(matches < cutoff)] = 0 if dup: matches = self._injectivity(matches) matches = list(zip(*np.where(matches > 0))) if not matches: raise StarsNotMatchedError('No stars matched between frames') elif len(matches) < 3: raise NotEnoughStarsMatchedError(len(matches)) matched_sources = [] for i in range(len(matches)): # Source objects s1 = self.frames[frame][matches[i][0]] s2 = self.frames[frame + 1][matches[i][1]] matched_sources.append((s1, s2)) ## RANSAC a = self.solver(matched_sources[:3]) total = min(In_i, In_j) best_match = 3 class SignalFunctionEndError(Exception): pass def handler(*args) -> None: raise SignalFunctionEndError try: # If this finishes, `handler` is never called signal.signal(signal.SIGALRM, handler) signal.alarm(ransac_timeout) print('** Starting RANSAC routine') iters = 0 while best_match < ransac_required_tol: maybe_inliers = sample(matched_sources, k=3) maybe_model = self.solver(maybe_inliers) also_inliers = [] for ind, s1 in enumerate(self.frames[frame]): trans = maybe_model.transform(s1.get_coordinates()) for s2 in self.frames[frame + 1]: if _dist(trans, s2.get_coordinates()) \ < ransac_center_tol: also_inliers.append((s1, s2)) break if len(also_inliers) >= best_match: total_inliers = maybe_inliers + also_inliers this_match = len(total_inliers) better_model = self.solver(total_inliers) if this_match > best_match: a = better_model best_match = this_match iters += 1 except SignalFunctionEndError: warn( f'Terminated RANSAC at {ransac_timeout}s after {iters} iterations' ) finally: signal.alarm(0) print(f'** Number of matches: {best_match}') # loop over all stars in the current frame, and if there's a match # in the next, write its coordinates in the next to file for s1 in self.frames[frame]: nxt = a.transform(s1.get_coordinates()) for s2 in self.frames[frame + 1]: cur = s2.get_coordinates() if _dist(nxt, cur) < ransac_center_tol: # Give matched stars the same NUMBER if s1.NUMBER != s2.NUMBER: for __s2 in self.frames[frame + 1]: if __s2.NUMBER == s1.NUMBER: # swap s2.NUMBER, __s2.NUMBER = __s2.NUMBER, s2.NUMBER f = self.out + ('/star_%.3d.dat' % s1.NUMBER) with open(f, 'w' if not frame else 'a') as star_file: if not frame: star_file.write( f'{s1.IMAGE_NUMBER}\t{s1.X_IMAGE}\t{s1.Y_IMAGE}\n' ) star_file.write( f'{s2.IMAGE_NUMBER}\t{cur[0]}\t{cur[1]}\n' ) if f not in sources: sources.append(f) break return sources @staticmethod def solver(matched_sources: List[Tuple[Source, Source]]) -> _Transform: """ Use linear regression to arrive at the optimal 6-parameter transformation between the coordinate system in one image and another. Parameters ---------- matched_sources : List[Tuple[Source, Source]] A list of tuples, each containing two sources, one from the current frame and the next. Returns ------- a : _Transform A :class:`_Transformation` object that will transform coordinates and print the system to the terminal in a nice format. """ M = np.zeros((6, 6)) U = np.zeros((3, 3)) q = np.zeros((6, 1)) N = len(matched_sources) # sum over all the sources' coordinates to build matrices' entries for s1, s2 in matched_sources: x, y = s1.get_coordinates() u, v = s2.get_coordinates() # build lower triangular matrix U[0, 0] += x * x U[1, 0] += x * y U[1, 1] += y * y U[2, 0] += x U[2, 1] += y # build vector q[0] += u * x q[1] += u * y q[2] += u q[3] += v * x q[4] += v * y q[5] += v # U is symmetric, i.e. U = U^T U[2, 2] = N U[0, 1] = U[1, 0] U[0, 2] = U[2, 0] U[1, 2] = U[2, 1] # because U is symmetric, so is M; i.e., M = M^T M[:3, :3] = U M[3:, 3:] = U # now we have the optimal transformation coefficients a = _Transform(*(inv(M) @ q).T.tolist()[0]) return a @staticmethod def _symmetry(t: Triangle, tri_tol: float) -> bool: """ Determine if a :class:`Triangle` is too symmetric; a moderately significant source of false-positives. Parameters ---------- t : :class:`Triangle` The :class:`Triangle` object to test. tri_tol : float The tolerance to assume while testing for similarity of sides. Returns ------- symmetry : bool If True, the :class:`Triangle` is too symmetric for use (and hence Stetson's algorithm above will ignore it for comparison). """ differences = ( abs(t.side_1 - t.side_2), abs(t.side_2 - t.side_3), abs(t.side_3 - t.side_1) ) symmetry = any(diff < tri_tol for diff in differences) return symmetry @staticmethod def _injectivity(arr: np.ndarray) -> np.ndarray: """ Filters double mappings along both row and column to those matches that reside closest to the array's diagonal. Parameters ---------- arr : np.ndarray The array formed in Stetson's algorithm (above) for matching star IDs to one another. Returns ------- arr : np.ndarray The same array with duplicates in the horizontal and vertical removed. """ _X, _Y = arr.shape # linear equations for the diagonals of the star ID array slope_X = _X / _Y slope_Y = 1 / slope_X z1 = lambda x0: slope_X * x0 z2 = lambda y0: slope_Y * y0 def rm_dup_max(vec: np.ndarray, lin: Function) -> Tuple[int, float]: # if there are duplicate maximums, select that # which is closest to the diagonal (basically # assumes all the frames were taken sequentially) _max, ind = vec.max(), vec.argmax() if len(np.where(vec == _max)) > 1: _maxes = np.where(vec == _max) target = lin(i) ind = (_maxes[0][:] - target).argmin() _max = vec[ind] return ind, _max for i in range(_X): col = arr[i] if np.count_nonzero(col) > 1: ind, _max = rm_dup_max(col, z1) arr[i, :] = 0 arr[i, ind] = _max for j in range(_Y): row = arr[:, j] if np.count_nonzero(row) > 1: ind, _max = rm_dup_max(row, z2) arr[:, j] = 0 arr[ind, j] = _max return arrThe basic idea is to iterate over all the possible triangles that can be formed with nodes at the stars' positions. While a more efficient implementation would generate a triangular (or symmetric) array with all of these distances (as mentioned in the papers I've linked to above), in the source above I've just used
itertools.combinations
for simplicity. If two triangles are similar enough in a space that's invariant to rotation, dilation and scaling in either direction, then the two stars get a 'vote' in the matches
array. A typical array looks as follows.After running this comparison on a sample of the triangles between the two fields and a background tolerance (bias) has been removed (reduces false positives to some extent), the RANSAC algorithm iterates through small samples (only three points are necessary to arrive at a valid affine transformation between the two lists) until a minimum threshold of matches has been achieved.
Given the two star lists above, I've reduced them to this set of files, each containing a list of coordinates (and the corresponding frame number). The optimal transform between the two lists I've arrived at is as follows, which yields 88 matches.
\begin{align}\begin{bmatrix}u\\v\end{bmatrix}&=\begin{bmatrix}0.999764&-0.000169\\0.000494&1.000358\end{bmatrix}\begin{bmatrix}x\\y\end{bmatrix}+\begin{bmatrix}2.600053\\-0.231103\end{bmatrix}.\end{align}I'd note that, because the telescope and camera combination were the same for both images, and they were taken consecutively, it's no wonder there's hardly any rotation or scaling; however, there certainly appears to have been a multiple pixel drift between exposures.
As far as improvements go, this definitely seems like a problem machine learning could solve. I've been working on using this method for sequences of exposures, but it's possible that the threshold is never met using RANSAC (why I implemented the time limit using
signal
in the code above). It's also possible that, given enough similarity between the two fields, the algorithm will simply find a sub-optimal solution (in other words, it will get stuck in a 'local optimum').Okay! Now that I got that off my chest, I have to go work a double shift :P