Skip to content


streamline auto-stitch, improve mse searching
Browse files Browse the repository at this point in the history
- fewer redraws during mse searching to improve performance
  (could eliminate all / multithread it but I think for now the
  animations are insightful and fun)
- auto-stitch now has you work through the left and top edges first,
  then runs unattended (instead of stopping every row for a check of
  the top stitch).
  • Loading branch information
bunnie committed Feb 4, 2024
1 parent beea8cd commit 502a13c
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 69 deletions.
15 changes: 15 additions & 0 deletions
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# ------ global configs
X_RES = 3840
Y_RES = 2160
PIX_PER_UM_20X = 3535 / 370 # 20x objective
Expand All @@ -22,6 +23,20 @@

PIEZO_UM_PER_LSB = (1/193.5) # from empirical measurements

# ----- configure template matching
# low scores are better. scores greater than this fail.
CONTOUR_THRESH = 192 # 192 for well-focused images; 224 if the focus quality is poor
# maximum number of potential solutions before falling back to manual review
SEARCH_SCALE = 0.80 # 0.8 worked on the AW set, 0.9 if using a square template

# snippet for a parser script (json-to-csv) for the piezo cal data
import json
Expand Down
5 changes: 0 additions & 5 deletions
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@
from utils import *
from config import *

# - split stitch into setup/auto phases
# - setup aligns all the edge bits manually
# - auto runs all stitching without check requests

# Coordinate system of OpenCV and X/Y on machine:
# (0,0) ----> X
Expand Down
178 changes: 114 additions & 64 deletions
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,6 @@
import threading
from config import *

# low scores are better. scores greater than this fail.
CONTOUR_THRESH = 192 # 192 for well-focused images; 224 if the focus quality is poor
# maximum number of potential solutions before falling back to manual review
SEARCH_SCALE = 0.80 # 0.8 worked on the AW set, 0.9 if using a square template

# Usage:
# Create the Profiler() object at the point where you want benchmarking to start
Expand Down Expand Up @@ -416,18 +405,19 @@ def mse_cleanup(self, picked):
was_coarse = True
mse_search = {
'current' : None,
'n' : None,
'w' : None,
'e' : None,
's' : None,
# this should be last so that the final corr/log_mse value can be recalled on the last iteration
'current' : None,
offsets = {
'current': (0, 0),
'n': (0, -1),
'w': (-1, 0),
'e': (1, 0),
's': (0, 1),
'current': (0, 0),
coarse_offsets = {
'current': (0, 0),
Expand All @@ -437,35 +427,48 @@ def mse_cleanup(self, picked):
's': (0, 2*ceil(Schema.PIX_PER_UM)),
steps = 1
starting_point = self.match_pt(picked)
current_point = self.match_pt(picked)
starting_point = current_point
corr = None
log_mse = None
key = -1
while step != 'DONE':
if steps >= MSE_SEARCH_LIMIT: # terminate if a limit cycle seems to be reached
logging.error("Step limit exceeded! aborting MSE cleanup.")
self.update_match_pt(picked, starting_point)
(corr, log_mse) = self.compute_mse(picked)
logging.debug(f"mse {log_mse}")
self.show_mse(picked, (corr, log_mse), console=False, draw=False)
cv2.waitKey() # force a redraw & pause
return steps
if key != -1:
# abort due to keystroke: return a negative steps closure result
return -1
if step == 'SEARCH' or step == 'SEARCH_COARSE':
for direction in mse_search.keys():
if step == 'SEARCH':
offset = offsets[direction]
offset = coarse_offsets[direction]
test_point = (starting_point[0] + offset[0], starting_point[1] + offset[1])
test_point = (current_point[0] + offset[0], current_point[1] + offset[1])
self.update_match_pt(picked, test_point)
(corr, log_mse) = self.compute_mse(picked)
logging.debug(f"mse {log_mse}")
self.show_mse(picked, (corr, log_mse), console=False)
cv2.waitKey(1) # force a redraw
mse_search[direction] = log_mse
step = 'EVAL_SEARCH'
elif step == 'EVAL_SEARCH':
sorted_results = sorted(mse_search.items(), key=lambda kv: kv[1])
best_direction = sorted_results[0][0]
best_mse = sorted_results[0][1]
if best_direction == 'current':
# restore the test point, we're done
self.update_match_pt(picked, test_point)
# restore the starting point, we're done
self.update_match_pt(picked, current_point)
self.show_mse(picked, (corr, log_mse), console=False)
cv2.waitKey(1) # force a redraw
key = cv2.waitKey(1) # force a redraw
if was_coarse:
# now do a fine search
was_coarse = False
Expand All @@ -476,18 +479,18 @@ def mse_cleanup(self, picked):
offset = offsets[best_direction]
# define the new starting point in the "best" direction
steps += 1
starting_point = (starting_point[0] + offset[0], starting_point[1] + offset[1])
current_point = (current_point[0] + offset[0], current_point[1] + offset[1])
# follow gradient until it doesn't get better
follow_gradient = True
while follow_gradient:
test_point = (starting_point[0] + offset[0], starting_point[1] + offset[1])
test_point = (current_point[0] + offset[0], current_point[1] + offset[1])
self.update_match_pt(picked, test_point)
(corr, log_mse) = self.compute_mse(picked)
logging.debug(f"mse {log_mse}")
if log_mse > best_mse:
# reset back to our previous point
self.update_match_pt(picked, starting_point)
self.update_match_pt(picked, current_point)
# do a 4-way search if we hit a gradient end point
Expand All @@ -496,12 +499,13 @@ def mse_cleanup(self, picked):
# adopt the new result as "best"
steps += 1
starting_point = (starting_point[0] + offset[0], starting_point[1] + offset[1])
current_point = (current_point[0] + offset[0], current_point[1] + offset[1])
self.show_mse(picked, (corr, log_mse), console=False)
cv2.waitKey(1) # force a redraw
key = cv2.waitKey(1) # force a redraw
# fall through to next iteration, continuing the search"MSE refinement pass finished in {steps} steps")
return steps

# computes the best score at the current template for each ref image
def best_scoring_index(self, multiple=False):
Expand All @@ -524,18 +528,21 @@ def best_scoring_index(self, multiple=False):
# point in a region of "meh"-ness. Reject these points.
if score is None:
# first iteration, just pick the first thing we found
picked = i
best_template = template_index
score = soln_score
solns = num_solns
if soln_score is not None:
# HEURISTIC: if we've already got something that passes our basic threshold,
# reject any solution that is less unique
if score < FAILING_SCORE:
if num_solns > solns:
# HEURISTIC: reject any solution that is less unique, so long as
# our base score isn't failing
if soln_score < score:
# if the score is better, OR if it is substantially more unique,
# pick the new solution.
if soln_score < score or num_solns < solns / 2:
score = soln_score
picked = i
best_template = template_index
Expand Down Expand Up @@ -786,7 +793,7 @@ def compute_mse(self, i):
log_mse = round(log10(mse), 5)
return (corr, log_mse)

def show_mse(self, i, computed = None, console=True):
def show_mse(self, i, computed = None, console=True, draw=True):
if computed is None:
(corr, log_mse) = self.compute_mse(i)
Expand All @@ -799,18 +806,19 @@ def show_mse(self, i, computed = None, console=True):
mse_hint = ''
if console:"MSE: {log_mse} {mse_hint}")
corr_f32 = cv2.normalize(corr, None, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_32F)
(_retval, corr_f32) = cv2.threshold(corr_f32, 0.8, 0.8, cv2.THRESH_TRUNC) # toss out extreme outliers (e.g. bad pixels)
corr_f32 = cv2.normalize(corr_f32, None, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_32F)
# corr_u8 = cv2.normalize(corr_f32, None, alpha=0, beta=255, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_8U)
if draw:
corr_f32 = cv2.normalize(corr, None, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_32F)
(_retval, corr_f32) = cv2.threshold(corr_f32, 0.8, 0.8, cv2.THRESH_TRUNC) # toss out extreme outliers (e.g. bad pixels)
corr_f32 = cv2.normalize(corr_f32, None, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_32F)
# corr_u8 = cv2.normalize(corr_f32, None, alpha=0, beta=255, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_8U)

corr_f32, f"MSE: {log_mse} {mse_hint}",
org=(100, 100),
fontScale=2, color=(255, 255, 255), thickness=2, lineType=cv2.LINE_AA
cv2.imshow("Find minimum MSE", cv2.resize(corr_f32, None, None, 0.5, 0.5))
corr_f32, f"MSE: {log_mse} {mse_hint}",
org=(100, 100),
fontScale=2, color=(255, 255, 255), thickness=2, lineType=cv2.LINE_AA
cv2.imshow("Find minimum MSE", cv2.resize(corr_f32, None, None, 0.5, 0.5))

# Use template matching of laplacians to do stitching
def stitch_one_template(self,
Expand Down Expand Up @@ -878,14 +886,22 @@ def stitch_one_template(self,
elif mode == 'AUTO':
verdict = state.eval_index(picked)
if verdict == 'GOOD':
if mse_cleanup:
if full_review:
msg = 'Good (edge)'
mode = 'TEMPLATE' # force a review of everything
msg = ''
mode = None
if mse_cleanup:
steps = state.mse_cleanup(picked)
if steps >= MSE_SEARCH_LIMIT:
msg = "Couldn't cleanup MSE! Limit cycle reached."
mode = 'MOVE'
elif steps < 0:
msg = "Manual pause due to keystroke in MSE search"
mode = 'MOVE'
msg = f"Good (mse found in {steps} steps)"
elif verdict == 'AMBIGUOUS':
msg = 'Ambiguous'
mode = 'TEMPLATE'
Expand Down Expand Up @@ -1111,9 +1127,12 @@ def stitch_auto_template_linear(self, stitch_list=None, mse_cleanup=False):
x_list = sorted(x_list)
y_list = sorted(y_list)

# reset the anchor if we have been given a stitch list. The anchor should not be the first
# item in the list, because the first item in the list is stitched incorrectly.
if stitch_list is not None:
# reset the anchor if we have been given a stitch list. The anchor should *not* be the first
# item in the list, because the first item in the list is stitched incorrectly.
# This algorithm searches the entire database for candidates for anchors, and not just the
# passed-in stich_list
total_x_list = list(self.schema.x_list)
total_y_list = list(self.schema.y_list)
new_anchor_found = False
Expand Down Expand Up @@ -1146,7 +1165,55 @@ def stitch_auto_template_linear(self, stitch_list=None, mse_cleanup=False):
logging.warning("Couldn't anchor the manual stitch list: too many tiles to left and top missing or deleted. Attempting to stitch with top tile of selection as anchor.")
# QUESTION: should we try to find an anchor to the *right* in the case that we're doing a manual restitch?
# Maybe if a left column is too hard to align, but the rest of the stitch is excellent...?

# Full-chip stitch:
# Ensure that the left and top edges are fully stitched before proceeding.
# -- stitch left edge
x = x_list[0] # this list was previously sorted
(ref_layer, _ref_t) = self.schema.get_tile_by_coordinate(anchor)
for y in y_list[1:]: # y_list[1:] because top-left anchor
(moving_layer, moving_t) = self.schema.get_tile_by_coordinate(Point(x, y))
if moving_t['auto_error'] == 'invalid' or moving_t['auto_error'] == 'true':
restart, abort = self.stitch_one_template(
if abort:
return False
self.schema.overwrite() # save the schema after every step
if restart:
return restart
ref_layer = moving_layer
# -- stitch top edge
y = y_list[0]
(ref_layer, _ref_t) = self.schema.get_tile_by_coordinate(anchor)
for x in x_list[1:]: # x_list[1:] because top-left anchor
(moving_layer, moving_t) = self.schema.get_tile_by_coordinate(Point(x, y))
if moving_t['auto_error'] == 'invalid' or moving_t['auto_error'] == 'true':
restart, abort = self.stitch_one_template(
# edge cases always checked for manual review: it's impossible to get a good stitch
# if any of the edges are wrong.
if abort:
return False
self.schema.overwrite() # save the schema after every step
if restart:
return restart
ref_layer = moving_layer

# now stitch everything that hasn't already been stitched
last_y_top = anchor
next_tile = None
prev_x = None
Expand All @@ -1166,7 +1233,6 @@ def stitch_auto_template_linear(self, stitch_list=None, mse_cleanup=False):
# because the first tile we'd ever encounter should be the anchor!
edge_case = False
(ref_layer, ref_t) = self.schema.get_tile_by_coordinate(cur_tile)
if ref_t is None: # handle db deletions
cur_tile = next_tile
Expand All @@ -1184,7 +1250,6 @@ def stitch_auto_template_linear(self, stitch_list=None, mse_cleanup=False):
if y == last_y_top.y:
# on the top row, we actually want the left item and left-lower item as options, assuming
# we're not already at the very left
edge_case = True
(left_lower_layer, left_t) = self.schema.get_tile_by_coordinate(Point(prev_x, y + Schema.NOM_STEP))
if left_t is not None:
ref_layers = [ref_layer, left_lower_layer]
Expand All @@ -1196,36 +1261,21 @@ def stitch_auto_template_linear(self, stitch_list=None, mse_cleanup=False):
if left_t is not None:
ref_layers = [ref_layer, left_layer]
edge_case = True
ref_layers = [ref_layer]
ref_layers = [ref_layer]
edge_case = True
# in the case of an explicitly specified stitch list, don't flag any edge cases
# if you want to manually review everything in a restitch, use "Flag for Manual Review"
# in the UI on the selection first!
if stitch_list is not None:
edge_case = False
# Right now, we force a full review on top and left edges. This is because
# The correctness of these alignments have to be spot-on for everything else to work,
# and also because on the edges there is a strong chance that a template match
# picks something *not* on the chip to match on (some of the background matting
# or an edge artifact of the chip, which is poorly focused). Haven't figured out a
# good way to figure out how to avoid that -- maybe some specialized algorithm that
# knows we're in an edge case and looks for stuff only to the inside of the brightest
# line that would define the chip edge?

restart, abort = self.stitch_one_template(
if abort:
return False
if True: #retrench:
self.schema.overwrite() # possibly dubious call to save the schema every time after manual patch-up
self.schema.overwrite() # save the schema after every step
if restart:
return restart
cur_tile = next_tile
Expand Down

0 comments on commit 502a13c

Please sign in to comment.