### Matching stars between frames

Lately I've been trying to find algorithms that will match stars between two consecutive CCD images. Needless to say, the task isn't at all easy, and my naive' implementations have rather costly time complexities. However, with a little tuning, it's possible to get pretty decent results.

To begin, I installed Source Extractor (on Debian and Ubuntu, just type sudo apt-get install sextractor). Then I wrote this Python script to generate NN filters. As an example use, try running the following.
./psf.py --gaussian -i \$(echo 3.1 > gaussian.conf && echo gaussian.conf) -s 6 -o gauss.conv && rm gaussian.conf && cat gauss.conv

In the terminal you should see the following.
# Gaussian  convolution filter for params 3.1
0.013196 0.019866 0.024375 0.024375 0.019866 0.013196
0.019866 0.029908 0.036695 0.036695 0.029908 0.019866
0.024375 0.036695 0.045023 0.045023 0.036695 0.024375
0.024375 0.036695 0.045023 0.045023 0.036695 0.024375
0.019866 0.029908 0.036695 0.036695 0.029908 0.019866
0.013196 0.019866 0.024375 0.024375 0.019866 0.013196

Next, after writing up the configuration files and running it on two fields, I've arrived at these two star lists.

Now the problem has been reduced to finding common sources between the former and latter lists; in other words, we're interested in finding an injective (but not necessarily surjective) mapping from the former to the latter list.

To organize and read the data in these lists, I've written the following class and function in Python.
from typing import List, Tuple

class Source:
"""
Basically a container for holding data about a particular source
"""
def __init__(self, NUMBER: int, X_IMAGE: float, Y_IMAGE: float,
MAG_BEST: float, MAGERR_AUTO: float, IMAGE_NUMBER: int) -> None:

self.NUMBER = NUMBER              # number of source in an image
self.X_IMAGE = X_IMAGE            # x-coordinate in image
self.Y_IMAGE = Y_IMAGE            # y-coordinate in image
self.MAG_BEST = MAG_BEST          # best of MAG_AUTO, MAG_ISOCOR from SE
self.MAGERR_AUTO = MAGERR_AUTO    # RMS error for AUTO magnitude
self.IMAGE_NUMBER = IMAGE_NUMBER  # image number it appears in

def get_coordinates(self) -> Tuple[float, float]:
"""
Get the location of an object (helper function)
"""
loc = (self.X_IMAGE, self.Y_IMAGE)

return loc

"""
Map a list of file names to a list of lists with source data.

Parameters
----------
fnames : List[List[str]]
List of list of paths to SExtractor output tables containing
source ID, (x,y)-coordinate, magnitude and magnitude error.

Returns
-------
frames : List[List[Source]]
List of list of sources, encoded as Source objects, in each image.
"""
frames: List[List[Source]] = []

for id, fname in enumerate(fnames, start=1):
# use astropy's amazing read function to grab SE tabular data
stars = []

# parse as Source objects
for src in data:
values = map(np.asscalar, src)
star = Source(*values, id)
stars.append(star)

frames.append(stars)

return frames

So the two star tables are now contained in a list of lists of Source objects.

To locate possible matches, I've tried the algorithm described by Stetson (1989). After realizing that it gave far too many false-positives (e.g. see Valdes et. al. (1995)), I tried adding a simple RANSAC algorithm implementation, suggested by PixInsight. In Python, my implementation is as follows.
from math import sqrt
from _exceptions import StarsNotMatchedError, NotEnoughStarsMatchedError
from numpy.linalg import inv, norm
from random import sample
from itertools import combinations
from types import FunctionType as Function
from warnings import warn

import signal
import numpy as np
import os

def _dist(x: Tuple[float, float], y: Tuple[float, float]) -> float:
"""
Get the Euclidean distance between these two points.
"""
_x: np.ndarray = np.array(x)
_y: np.ndarray = np.array(y)

d2 = norm(_x - _y)

return d2

class Triangle:
"""
Hold parameter description of a triangle of stars

Parameters
----------
BA : float
The ratio of dividing the triangle's intermediate side by its longest.
CA : float
The ratio of dividing the triangle's shortest side by its longest.
src : Source
The source that lies opposite the triangle's longest side.
*sides : float
All the sidelengths, which could be used at some point in
:class:Stetson for determining if a triangle is too symmetric.
"""
def __init__(self, BA: float, CA: float, src: Source, *sides: float) -> None:
self.BA = BA
self.CA = CA
self.side_1, self.side_2, self.side_3 = sides
self.src = src

@classmethod
def build(cls, pts: Tuple[Source, Source, Source]) -> 'Triangle':
# If we only have the Source's
c_1, c_2, c_3 = pts

side_1 = _dist(c_1.get_coordinates(), c_2.get_coordinates()) # c_3
side_2 = _dist(c_2.get_coordinates(), c_3.get_coordinates()) # c_1
side_3 = _dist(c_3.get_coordinates(), c_1.get_coordinates()) # c_2

triangle = {side_1: c_3, side_2: c_1, side_3: c_2}

A, B, C = sorted(triangle.keys())
maximum_src = triangle[C]

BA = B / A
CA = C / A

t = cls(BA, CA, maximum_src, A, B, C)

return t

def __sub__(self, other: 'Triangle') -> float:
"""
compute the distance from self to other in the invariant
(B/A, C/A)-space.

Returns
-------
diff : float
The Euclidean distance between this triangle's BA and CA values
and another's.
"""

diff = sqrt((self.BA - other.BA) ** 2 + (self.CA - other.CA) ** 2)

return diff

class _Transform:
"""
Pretty print' the transformation equations between frames to the terminal
and perform transformations using the coefficients.

Parameters
----------
a11, ..., a23 : float
The six optimal affine transform parameters.
"""
def __init__(self, a11: float, a12: float, a13: float, a21: float,
a22: float, a23: float) -> None:
self.a11 = a11
self.a12 = a12
self.a13 = a13
self.a21 = a21
self.a22 = a22
self.a23 = a23

def __str__(self) -> str:
form = f'u = {self.a11} x1 + {self.a12} y1 + {self.a13}\n' \
f'v = {self.a21} x1 + {self.a22} y1 + {self.a23}'

return form

def __repr__(self):
return self.__str__()

def transform(self, c: Tuple[float, float]) -> Tuple[float, float]:
"""
Transform a coordinate.
"""
x1, y1 = c

u = self.a11 * x1 + self.a12 * y1 + self.a13
v = self.a21 * x1 + self.a22 * y1 + self.a23

new_c = (u, v)

return new_c

class Correspondence:
"""
Match sources between CCD images that have been extracted with SExtractor.

Parameters
----------
frames : List[List[Source]]
A list of lists containing the sources in each image (which have been
encoded as :class:Source objects via :func:read_star_tables).
out : str, default 'sources'
The directory to write star coordinates to
clobber : bool, default True
If True,
"""
def __init__(self, frames: List[List[Source]], out: str ='sources',
clobber: bool =True) -> None:
self.frames = frames
self._num_frames = len(frames)
self.out = out      # dir we're writing data to

# where to write star_*.dat files and store coordinates of each Source
if not os.path.exists(out):
os.mkdir(out)
else:
# clear out the directory
for file in os.listdir(out):
os.remove(out + '/' + file)

def match(self, tol: float =1e-3, keep: int =25, cutoff: int =20,
tri_tol: float =0.0, dup: bool =False, ransac_required_tol: int =20,
ransac_center_tol: float =5, ransac_timeout: int =60) -> List[str]:
"""
Match stars between the list of frames containing lists of
:class:Sources.

Parameters
----------
tol : float, default 1e-3
Similarity tolerance level while comparing one :class:Triangle
object with another (with the special method -).
keep : int, default 25
The number of stars to keep from the current frame and the next
while looking for similarities.
cutoff : int, default 20
Minimum number of similar triangles for two stars to be
deemed the same.
tri_tol : float, default (off) 0.0
Side difference tolerance level while determining if a
:class:Triangle is too symmetric for use.
dup : bool, default False
Filter duplicate mappings that may occur.
ransac_required_tol : int, default 20 stars
Number of stars required to match between the two frames with the
transformation. The algorithm continues until a timeout otherwise.
ransac_center_tol : float, default 5 pixels
Circle radius within which stars in the opposite frame must reside
under the transformation to be deemed a match.
ransac_timeout : int, default 60 seconds
Number of seconds after which the RANSAC algorithm is cancelled and
current results are used.

Returns
-------
sources : List[str]
A list of unique source file names that contain a list of the
source's (x,y) pixel location in each frame

Raises
------
StarsNotMatchedError
If no stars are matched between lists.
NotEnoughStarsMatchedError
If less than 3 stars are matched between the two frames.
"""

sources = []

for frame in range(self._num_frames - 1):

## Stetson's algorithm

In_i = len(self.frames[frame])
In_j = len(self.frames[frame + 1])
In_i_cut = self.frames[frame][:keep]
In_j_cut = self.frames[frame + 1][:keep]

matches = np.zeros((In_i, In_j))

# iterate over all triangles and compare. This may be a slow
# implementation because we waste time re-computing side lengths
for c_i in combinations(In_i_cut, r=3):
t_i = Triangle.build(c_i)
i = t_i.src.NUMBER - 1

# check for too much symmetry; we'd like scalene triangles
if tri_tol and self._symmetry(t_i, tri_tol):
continue

for c_j in combinations(In_j_cut, r=3):
t_j = Triangle.build(c_j)
j = t_j.src.NUMBER - 1

if tri_tol and self._symmetry(t_j, tri_tol):
continue

if t_i - t_j < tol:
# i is the star in the former image, j is the star
# in the latter that we think is the same one
matches[i, j] += 1

matches[np.where(matches < cutoff)] = 0

if dup:
matches = self._injectivity(matches)

matches = list(zip(*np.where(matches > 0)))

if not matches:
raise StarsNotMatchedError('No stars matched between frames')

elif len(matches) < 3:
raise NotEnoughStarsMatchedError(len(matches))

matched_sources = []

for i in range(len(matches)):
# Source objects
s1 = self.frames[frame][matches[i][0]]
s2 = self.frames[frame + 1][matches[i][1]]

matched_sources.append((s1, s2))

## RANSAC

a = self.solver(matched_sources[:3])
total = min(In_i, In_j)
best_match = 3

class SignalFunctionEndError(Exception):
pass

def handler(*args) -> None:
raise SignalFunctionEndError

try:
# If this finishes, handler is never called
signal.signal(signal.SIGALRM, handler)
signal.alarm(ransac_timeout)

print('** Starting RANSAC routine')

iters = 0
while best_match < ransac_required_tol:
maybe_inliers = sample(matched_sources, k=3)
maybe_model = self.solver(maybe_inliers)
also_inliers = []

for ind, s1 in enumerate(self.frames[frame]):
trans = maybe_model.transform(s1.get_coordinates())

for s2 in self.frames[frame + 1]:
if _dist(trans, s2.get_coordinates()) \
< ransac_center_tol:
also_inliers.append((s1, s2))
break

if len(also_inliers) >= best_match:
total_inliers = maybe_inliers + also_inliers
this_match = len(total_inliers)
better_model = self.solver(total_inliers)

if this_match > best_match:
a = better_model
best_match = this_match

iters += 1

except SignalFunctionEndError:
warn(
f'Terminated RANSAC at {ransac_timeout}s after {iters} iterations'
)

finally:
signal.alarm(0)

print(f'** Number of matches: {best_match}')

# loop over all stars in the current frame, and if there's a match
# in the next, write its coordinates in the next to file
for s1 in self.frames[frame]:
nxt = a.transform(s1.get_coordinates())

for s2 in self.frames[frame + 1]:
cur = s2.get_coordinates()

if _dist(nxt, cur) < ransac_center_tol:
# Give matched stars the same NUMBER
if s1.NUMBER != s2.NUMBER:
for __s2 in self.frames[frame + 1]:
if __s2.NUMBER == s1.NUMBER:
# swap
s2.NUMBER, __s2.NUMBER = __s2.NUMBER, s2.NUMBER

f = self.out + ('/star_%.3d.dat' % s1.NUMBER)

with open(f, 'w' if not frame else 'a') as star_file:
if not frame:
star_file.write(
f'{s1.IMAGE_NUMBER}\t{s1.X_IMAGE}\t{s1.Y_IMAGE}\n'
)

star_file.write(
f'{s2.IMAGE_NUMBER}\t{cur[0]}\t{cur[1]}\n'
)

if f not in sources:
sources.append(f)

break

return sources

@staticmethod
def solver(matched_sources: List[Tuple[Source, Source]]) -> _Transform:
"""
Use linear regression to arrive at the optimal 6-parameter
transformation between the coordinate system in one image and another.

Parameters
----------
matched_sources : List[Tuple[Source, Source]]
A list of tuples, each containing two sources, one from the
current frame and the next.

Returns
-------
a : _Transform
A :class:_Transformation object that will transform coordinates
and print the system to the terminal in a nice format.
"""
M = np.zeros((6, 6))
U = np.zeros((3, 3))
q = np.zeros((6, 1))

N = len(matched_sources)

# sum over all the sources' coordinates to build matrices' entries
for s1, s2 in matched_sources:
x, y = s1.get_coordinates()
u, v = s2.get_coordinates()

# build lower triangular matrix
U[0, 0] += x * x
U[1, 0] += x * y
U[1, 1] += y * y
U[2, 0] += x
U[2, 1] += y

# build vector
q[0] += u * x
q[1] += u * y
q[2] += u
q[3] += v * x
q[4] += v * y
q[5] += v

# U is symmetric, i.e. U = U^T
U[2, 2] = N
U[0, 1] = U[1, 0]
U[0, 2] = U[2, 0]
U[1, 2] = U[2, 1]

# because U is symmetric, so is M; i.e., M = M^T
M[:3, :3] = U
M[3:, 3:] = U

# now we have the optimal transformation coefficients
a = _Transform(*(inv(M) @ q).T.tolist()[0])

return a

@staticmethod
def _symmetry(t: Triangle, tri_tol: float) -> bool:
"""
Determine if a :class:Triangle is too symmetric; a moderately
significant source of false-positives.

Parameters
----------
t : :class:Triangle
The :class:Triangle object to test.
tri_tol : float
The tolerance to assume while testing for similarity of sides.

Returns
-------
symmetry : bool
If True, the :class:Triangle is too symmetric for use (and hence
Stetson's algorithm above will ignore it for comparison).
"""
differences = (
abs(t.side_1 - t.side_2),
abs(t.side_2 - t.side_3),
abs(t.side_3 - t.side_1)
)

symmetry = any(diff < tri_tol for diff in differences)

return symmetry

@staticmethod
def _injectivity(arr: np.ndarray) -> np.ndarray:
"""
Filters double mappings along both row and column to those matches
that reside closest to the array's diagonal.

Parameters
----------
arr : np.ndarray
The array formed in Stetson's algorithm (above) for matching
star IDs to one another.

Returns
-------
arr : np.ndarray
The same array with duplicates in the horizontal and vertical
removed.
"""
_X, _Y = arr.shape

# linear equations for the diagonals of the star ID array
slope_X = _X / _Y
slope_Y = 1 / slope_X

z1 = lambda x0: slope_X * x0
z2 = lambda y0: slope_Y * y0

def rm_dup_max(vec: np.ndarray, lin: Function) -> Tuple[int, float]:
# if there are duplicate maximums, select that
# which is closest to the diagonal (basically
# assumes all the frames were taken sequentially)
_max, ind = vec.max(), vec.argmax()

if len(np.where(vec == _max)) > 1:
_maxes = np.where(vec == _max)
target = lin(i)
ind = (_maxes[0][:] - target).argmin()
_max = vec[ind]

return ind, _max

for i in range(_X):
col = arr[i]
if np.count_nonzero(col) > 1:
ind, _max = rm_dup_max(col, z1)
arr[i, :] = 0
arr[i, ind] = _max

for j in range(_Y):
row = arr[:, j]
if np.count_nonzero(row) > 1:
ind, _max = rm_dup_max(row, z2)
arr[:, j] = 0
arr[ind, j] = _max

return arr

The basic idea is to iterate over all the possible triangles that can be formed with nodes at the stars' positions. While a more efficient implementation would generate a triangular (or symmetric) array with all of these distances (as mentioned in the papers I've linked to above), in the source above I've just used itertools.combinations for simplicity. If two triangles are similar enough in a space that's invariant to rotation, dilation and scaling in either direction, then the two stars get a 'vote' in the matches array. A typical array looks as follows.

After running this comparison on a sample of the triangles between the two fields and a background tolerance (bias) has been removed (reduces false positives to some extent), the RANSAC algorithm iterates through small samples (only three points are necessary to arrive at a valid affine transformation between the two lists) until a minimum threshold of matches has been achieved.

Given the two star lists above, I've reduced them to this set of files, each containing a list of coordinates (and the corresponding frame number). The optimal transform between the two lists I've arrived at is as follows, which yields 88 matches.
\begin{align}\begin{bmatrix}u\\v\end{bmatrix}&=\begin{bmatrix}0.999764&-0.000169\\0.000494&1.000358\end{bmatrix}\begin{bmatrix}x\\y\end{bmatrix}+\begin{bmatrix}2.600053\\-0.231103\end{bmatrix}.\end{align}I'd note that, because the telescope and camera combination were the same for both images, and they were taken consecutively, it's no wonder there's hardly any rotation or scaling; however, there certainly appears to have been a multiple pixel drift between exposures.

As far as improvements go, this definitely seems like a problem machine learning could solve. I've been working on using this method for sequences of exposures, but it's possible that the threshold is never met using RANSAC (why I implemented the time limit using signal in the code above). It's also possible that, given enough similarity between the two fields, the algorithm will simply find a sub-optimal solution (in other words, it will get stuck in a 'local optimum').

Okay! Now that I got that off my chest, I have to go work a double shift :P