Matching stars between frames

Lately I've been trying to find algorithms that will match stars between two consecutive CCD images. Needless to say, the task isn't at all easy, and my `naive' implementations have rather costly time complexities. However, with a little tuning, it's possible to get pretty descent results.

To begin, I installed Source Extractor (on Debian and Ubuntu, just type sudo apt-get install sextractor). Then I wrote this Python script to generate NN filters. As an example use, try running the following.
./psf.py --gaussian -i $(echo 3.1 > gaussian.conf && echo gaussian.conf) -s 6 -o gauss.conv && rm gaussian.conf && cat gauss.conv
In the terminal you should see the following.
# Gaussian  convolution filter for params 3.1
0.013196 0.019866 0.024375 0.024375 0.019866 0.013196
0.019866 0.029908 0.036695 0.036695 0.029908 0.019866
0.024375 0.036695 0.045023 0.045023 0.036695 0.024375
0.024375 0.036695 0.045023 0.045023 0.036695 0.024375
0.019866 0.029908 0.036695 0.036695 0.029908 0.019866
0.013196 0.019866 0.024375 0.024375 0.019866 0.013196
Next, after writing up the configuration files and running it on two fields, I've arrived at these two star lists.

Now the problem has been reduced to finding common sources between the former and latter lists; in other words, we're interested in finding an injective (but not necessarily surjective) mapping from the former to the latter list.

To organize and read the data in these lists, I've written the following class and function in Python.
from typing import List, Tuple
from astropy.io.ascii import read


class Source:
    """ 
    Basically a container for holding data about a particular source 
    """
    def __init__(self, NUMBER: int, X_IMAGE: float, Y_IMAGE: float, 
            MAG_BEST: float, MAGERR_AUTO: float, IMAGE_NUMBER: int) -> None:

        self.NUMBER = NUMBER              # number of source in an image
        self.X_IMAGE = X_IMAGE            # x-coordinate in image
        self.Y_IMAGE = Y_IMAGE            # y-coordinate in image
        self.MAG_BEST = MAG_BEST          # best of MAG_AUTO, MAG_ISOCOR from SE
        self.MAGERR_AUTO = MAGERR_AUTO    # RMS error for AUTO magnitude
        self.IMAGE_NUMBER = IMAGE_NUMBER  # image number it appears in

    def get_coordinates(self) -> Tuple[float, float]:
        """
        Get the location of an object (helper function)
        """
        loc = (self.X_IMAGE, self.Y_IMAGE)

        return loc


def read_star_tables(fnames: List[str]) -> List[List[Source]]:
    """
    Map a list of file names to a list of lists with source data.
    
    Parameters
    ----------
    fnames : List[List[str]] 
        List of list of paths to SExtractor output tables containing
        source ID, (x,y)-coordinate, magnitude and magnitude error.
    
    Returns
    -------
    frames : List[List[Source]]
        List of list of sources, encoded as Source objects, in each image.
    """
    frames: List[List[Source]] = []

    for id, fname in enumerate(fnames, start=1):
        # use astropy's amazing `read` function to grab SE tabular data
        data = read(fname).as_array()
        stars = []

        # parse as Source objects
        for src in data:
            values = map(np.asscalar, src)
            star = Source(*values, id)
            stars.append(star)

        frames.append(stars)

    return frames
So the two star tables are now contained in a list of lists of Source objects.

To locate possible matches, I've tried the algorithm described by Stetson (1989). After realizing that it gave far too many false-positives (e.g. see Valdes et. al. (1995)), I tried adding a simple RANSAC algorithm implementation, suggested by PixInsight. In Python, my implementation is as follows.
from math import sqrt
from _exceptions import StarsNotMatchedError, NotEnoughStarsMatchedError
from numpy.linalg import inv, norm
from random import sample
from itertools import combinations
from types import FunctionType as Function
from warnings import warn

import signal
import numpy as np
import os


def _dist(x: Tuple[float, float], y: Tuple[float, float]) -> float:
    """
    Get the Euclidean distance between these two points.
    """
    _x: np.ndarray = np.array(x)
    _y: np.ndarray = np.array(y)
    
    d2 = norm(_x - _y)

    return d2


class Triangle:
    """
    Hold parameter description of a triangle of stars 
    
    Parameters
    ----------
    BA : float
        The ratio of dividing the triangle's intermediate side by its longest.
    CA : float
        The ratio of dividing the triangle's shortest side by its longest.
    src : Source
        The source that lies opposite the triangle's longest side.
    *sides : float
        All the sidelengths, which could be used at some point in 
        :class:`Stetson` for determining if a triangle is too symmetric.
    """
    def __init__(self, BA: float, CA: float, src: Source, *sides: float) -> None:
        self.BA = BA
        self.CA = CA
        self.side_1, self.side_2, self.side_3 = sides
        self.src = src

    @classmethod
    def build(cls, pts: Tuple[Source, Source, Source]) -> 'Triangle':
        # If we only have the Source's
        c_1, c_2, c_3 = pts

        side_1 = _dist(c_1.get_coordinates(), c_2.get_coordinates()) # c_3
        side_2 = _dist(c_2.get_coordinates(), c_3.get_coordinates()) # c_1
        side_3 = _dist(c_3.get_coordinates(), c_1.get_coordinates()) # c_2

        triangle = {side_1: c_3, side_2: c_1, side_3: c_2}

        A, B, C = sorted(triangle.keys())
        maximum_src = triangle[C]

        BA = B / A
        CA = C / A

        t = cls(BA, CA, maximum_src, A, B, C)

        return t

    def __sub__(self, other: 'Triangle') -> float:
        """
        compute the distance from self to other in the invariant 
        (B/A, C/A)-space.
         
        Returns
        -------
        diff : float
            The Euclidean distance between this triangle's BA and CA values
            and another's.
        """

        diff = sqrt((self.BA - other.BA) ** 2 + (self.CA - other.CA) ** 2)

        return diff


class _Transform:
    """ 
    `Pretty print' the transformation equations between frames to the terminal
    and perform transformations using the coefficients.    
    
    Parameters
    ----------
    a11, ..., a23 : float
        The six optimal affine transform parameters.
    """
    def __init__(self, a11: float, a12: float, a13: float, a21: float,
                 a22: float, a23: float) -> None:
        self.a11 = a11
        self.a12 = a12
        self.a13 = a13
        self.a21 = a21
        self.a22 = a22
        self.a23 = a23

    def __str__(self) -> str:
        form = f'u = {self.a11} x1 + {self.a12} y1 + {self.a13}\n' \
               f'v = {self.a21} x1 + {self.a22} y1 + {self.a23}'

        return form

    def __repr__(self):
        return self.__str__()

    def transform(self, c: Tuple[float, float]) -> Tuple[float, float]:
        """ 
        Transform a coordinate.
        """
        x1, y1 = c

        u = self.a11 * x1 + self.a12 * y1 + self.a13
        v = self.a21 * x1 + self.a22 * y1 + self.a23

        new_c = (u, v)

        return new_c


class Correspondence:
    """ 
    Match sources between CCD images that have been extracted with SExtractor.
    
    Parameters
    ----------
    frames : List[List[Source]]
        A list of lists containing the sources in each image (which have been
        encoded as :class:`Source` objects via :func:`read_star_tables`).
    out : str, default 'sources'
        The directory to write star coordinates to
    clobber : bool, default True
        If True, 
    """
    def __init__(self, frames: List[List[Source]], out: str ='sources',
            clobber: bool =True) -> None:
        self.frames = frames
        self._num_frames = len(frames)
        self.out = out      # dir we're writing data to

        # where to write star_*.dat files and store coordinates of each Source
        if not os.path.exists(out):
            os.mkdir(out)
        else:
            # clear out the directory
            for file in os.listdir(out):
                os.remove(out + '/' + file)

    def match(self, tol: float =1e-3, keep: int =25, cutoff: int =20,
              tri_tol: float =0.0, dup: bool =False, ransac_required_tol: int =20,
              ransac_center_tol: float =5, ransac_timeout: int =60) -> List[str]:
        """ 
        Match stars between the list of frames containing lists of 
        :class:`Source`s.
        
        Parameters
        ----------
        tol : float, default 1e-3
            Similarity tolerance level while comparing one :class:`Triangle`
            object with another (with the special method `-`).
        keep : int, default 25
            The number of stars to keep from the current frame and the next
            while looking for similarities.
        cutoff : int, default 20
            Minimum number of similar triangles for two stars to be
            deemed the same.
        tri_tol : float, default (off) 0.0
            Side difference tolerance level while determining if a 
            :class:`Triangle` is too symmetric for use.
        dup : bool, default False
            Filter duplicate mappings that may occur.
        ransac_required_tol : int, default 20 stars
            Number of stars required to match between the two frames with the
            transformation. The algorithm continues until a timeout otherwise.
        ransac_center_tol : float, default 5 pixels
            Circle radius within which stars in the opposite frame must reside
            under the transformation to be deemed a match.
        ransac_timeout : int, default 60 seconds
            Number of seconds after which the RANSAC algorithm is cancelled and
            current results are used.
            
        Returns
        -------
        sources : List[str]
            A list of unique source file names that contain a list of the 
            source's (x,y) pixel location in each frame
            
        Raises
        ------
        StarsNotMatchedError
            If no stars are matched between lists.
        NotEnoughStarsMatchedError
            If less than 3 stars are matched between the two frames.
        """

        sources = []

        for frame in range(self._num_frames - 1):

            ## Stetson's algorithm

            In_i = len(self.frames[frame])
            In_j = len(self.frames[frame + 1])
            In_i_cut = self.frames[frame][:keep]
            In_j_cut = self.frames[frame + 1][:keep]

            matches = np.zeros((In_i, In_j))

            # iterate over all triangles and compare. This may be a slow
            # implementation because we waste time re-computing side lengths
            for c_i in combinations(In_i_cut, r=3):
                t_i = Triangle.build(c_i)
                i = t_i.src.NUMBER - 1

                # check for too much symmetry; we'd like scalene triangles
                if tri_tol and self._symmetry(t_i, tri_tol):
                    continue

                for c_j in combinations(In_j_cut, r=3):
                    t_j = Triangle.build(c_j)
                    j = t_j.src.NUMBER - 1

                    if tri_tol and self._symmetry(t_j, tri_tol):
                        continue

                    if t_i - t_j < tol:
                        # `i` is the star in the former image, `j` is the star
                        # in the latter that we think is the same one
                        matches[i, j] += 1

            matches[np.where(matches < cutoff)] = 0

            if dup:
                matches = self._injectivity(matches)

            matches = list(zip(*np.where(matches > 0)))

            if not matches:
                raise StarsNotMatchedError('No stars matched between frames')

            elif len(matches) < 3:
                raise NotEnoughStarsMatchedError(len(matches))

            matched_sources = []

            for i in range(len(matches)):
                # Source objects
                s1 = self.frames[frame][matches[i][0]]
                s2 = self.frames[frame + 1][matches[i][1]]

                matched_sources.append((s1, s2))

            ## RANSAC

            a = self.solver(matched_sources[:3])
            total = min(In_i, In_j)
            best_match = 3


            class SignalFunctionEndError(Exception): 
                pass


            def handler(*args) -> None:
                raise SignalFunctionEndError


            try:
                # If this finishes, `handler` is never called
                signal.signal(signal.SIGALRM, handler)
                signal.alarm(ransac_timeout)

                print('** Starting RANSAC routine')

                iters = 0
                while best_match < ransac_required_tol:
                    maybe_inliers = sample(matched_sources, k=3)
                    maybe_model = self.solver(maybe_inliers)
                    also_inliers = []

                    for ind, s1 in enumerate(self.frames[frame]):
                        trans = maybe_model.transform(s1.get_coordinates())

                        for s2 in self.frames[frame + 1]:
                            if _dist(trans, s2.get_coordinates()) \
                                    < ransac_center_tol:
                                also_inliers.append((s1, s2))
                                break

                    if len(also_inliers) >= best_match:
                        total_inliers = maybe_inliers + also_inliers
                        this_match = len(total_inliers)
                        better_model = self.solver(total_inliers)

                        if this_match > best_match:
                            a = better_model
                            best_match = this_match

                    iters += 1

            except SignalFunctionEndError:
                warn(
                    f'Terminated RANSAC at {ransac_timeout}s after {iters} iterations'
                )

            finally:
                signal.alarm(0)

            print(f'** Number of matches: {best_match}')

            # loop over all stars in the current frame, and if there's a match
            # in the next, write its coordinates in the next to file
            for s1 in self.frames[frame]:
                nxt = a.transform(s1.get_coordinates())
                
                for s2 in self.frames[frame + 1]:
                    cur = s2.get_coordinates()
                    
                    if _dist(nxt, cur) < ransac_center_tol:
                        # Give matched stars the same NUMBER
                        if s1.NUMBER != s2.NUMBER:
                            for __s2 in self.frames[frame + 1]:
                                if __s2.NUMBER == s1.NUMBER:
                                    # swap
                                    s2.NUMBER, __s2.NUMBER = __s2.NUMBER, s2.NUMBER

                        f = self.out + ('/star_%.3d.dat' % s1.NUMBER)

                        with open(f, 'w' if not frame else 'a') as star_file:
                            if not frame:
                                star_file.write(
                                    f'{s1.IMAGE_NUMBER}\t{s1.X_IMAGE}\t{s1.Y_IMAGE}\n'
                                )

                            star_file.write(
                                f'{s2.IMAGE_NUMBER}\t{cur[0]}\t{cur[1]}\n'
                            )
                        
                        if f not in sources:
                            sources.append(f)
                        
                        break

        return sources

    @staticmethod
    def solver(matched_sources: List[Tuple[Source, Source]]) -> _Transform:
        """
        Use linear regression to arrive at the optimal 6-parameter 
        transformation between the coordinate system in one image and another.
        
        Parameters
        ----------
        matched_sources : List[Tuple[Source, Source]]
            A list of tuples, each containing two sources, one from the 
            current frame and the next.
        
        Returns
        -------
        a : _Transform
            A :class:`_Transformation` object that will transform coordinates
            and print the system to the terminal in a nice format.
        """
        M = np.zeros((6, 6))
        U = np.zeros((3, 3))
        q = np.zeros((6, 1))

        N = len(matched_sources)

        # sum over all the sources' coordinates to build matrices' entries
        for s1, s2 in matched_sources:
            x, y = s1.get_coordinates()
            u, v = s2.get_coordinates()

            # build lower triangular matrix
            U[0, 0] += x * x
            U[1, 0] += x * y
            U[1, 1] += y * y
            U[2, 0] += x
            U[2, 1] += y

            # build vector
            q[0] += u * x
            q[1] += u * y
            q[2] += u
            q[3] += v * x
            q[4] += v * y
            q[5] += v

        # U is symmetric, i.e. U = U^T
        U[2, 2] = N
        U[0, 1] = U[1, 0]
        U[0, 2] = U[2, 0]
        U[1, 2] = U[2, 1]

        # because U is symmetric, so is M; i.e., M = M^T
        M[:3, :3] = U
        M[3:, 3:] = U

        # now we have the optimal transformation coefficients
        a = _Transform(*(inv(M) @ q).T.tolist()[0])

        return a

    @staticmethod
    def _symmetry(t: Triangle, tri_tol: float) -> bool:
        """ 
        Determine if a :class:`Triangle` is too symmetric; a moderately
        significant source of false-positives.
        
        Parameters
        ----------
        t : :class:`Triangle`
            The :class:`Triangle` object to test.
        tri_tol : float
            The tolerance to assume while testing for similarity of sides.
            
        Returns
        -------
        symmetry : bool
            If True, the :class:`Triangle` is too symmetric for use (and hence
            Stetson's algorithm above will ignore it for comparison).
        """
        differences = (
            abs(t.side_1 - t.side_2),
            abs(t.side_2 - t.side_3),
            abs(t.side_3 - t.side_1)
        )

        symmetry = any(diff < tri_tol for diff in differences)

        return symmetry

    @staticmethod
    def _injectivity(arr: np.ndarray) -> np.ndarray:
        """
        Filters double mappings along both row and column to those matches 
        that reside closest to the array's diagonal.
        
        Parameters
        ----------
        arr : np.ndarray
            The array formed in Stetson's algorithm (above) for matching
            star IDs to one another.
            
        Returns
        -------
        arr : np.ndarray
            The same array with duplicates in the horizontal and vertical
            removed.
        """
        _X, _Y = arr.shape

        # linear equations for the diagonals of the star ID array
        slope_X = _X / _Y
        slope_Y = 1 / slope_X

        z1 = lambda x0: slope_X * x0
        z2 = lambda y0: slope_Y * y0

        def rm_dup_max(vec: np.ndarray, lin: Function) -> Tuple[int, float]:
            # if there are duplicate maximums, select that
            # which is closest to the diagonal (basically
            # assumes all the frames were taken sequentially)
            _max, ind = vec.max(), vec.argmax()

            if len(np.where(vec == _max)) > 1:
                _maxes = np.where(vec == _max)
                target = lin(i)
                ind = (_maxes[0][:] - target).argmin()
                _max = vec[ind]

            return ind, _max

        for i in range(_X):
            col = arr[i]
            if np.count_nonzero(col) > 1:
                ind, _max = rm_dup_max(col, z1)
                arr[i, :] = 0
                arr[i, ind] = _max

        for j in range(_Y):
            row = arr[:, j]
            if np.count_nonzero(row) > 1:
                ind, _max = rm_dup_max(row, z2)
                arr[:, j] = 0
                arr[ind, j] = _max

        return arr
The basic idea is to iterate over all the possible triangles that can be formed with nodes at the stars' positions. While a more efficient implementation would generate a triangular (or symmetric) array with all of these distances (as mentioned in the papers I've linked to above), in the source above I've just used itertools.combinations for simplicity. If two triangles are similar enough in a space that's invariant to rotation, dilation and scaling in either direction, then the two stars get a 'vote' in the matches array. A typical array looks as follows.


After running this comparison on a sample of the triangles between the two fields and a background tolerance (bias) has been removed (reduces false positives to some extent), the RANSAC algorithm iterates through small samples (only three points are necessary to arrive at a valid affine transformation between the two lists) until a minimum threshold of matches has been achieved.

Given the two star lists above, I've reduced them to this set of files, each containing a list of coordinates (and the corresponding frame number). The optimal transform between the two lists I've arrived at is as follows, which yields 88 matches.
\begin{align}\begin{bmatrix}u\\v\end{bmatrix}&=\begin{bmatrix}0.999764&-0.000169\\0.000494&1.000358\end{bmatrix}\begin{bmatrix}x\\y\end{bmatrix}+\begin{bmatrix}2.600053\\-0.231103\end{bmatrix}.\end{align}I'd note that, because the telescope and camera combination were the same for both images, and they were taken consecutively, it's no wonder there's hardly any rotation or scaling; however, there certainly appears to have been a multiple pixel drift between exposures.

As far as improvements go, this definitely seems like a problem machine learning could solve. I've been working on using this method for sequences of exposures, but it's possible that the threshold is never met using RANSAC (why I implemented the time limit using signal in the code above). It's also possible that, given enough similarity between the two fields, the algorithm will simply find a sub-optimal solution (in other words, it will get stuck in a 'local optimum').

Okay! Now that I got that off my chest, I have to go work a double shift :P