我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用cv2.calcOpticalFlowPyrLK()。
def sparse_optical_flow(im1, im2, pts, fb_threshold=-1, window_size=15, max_level=2, criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)): # Forward flow p1, st, err = cv2.calcOpticalFlowPyrLK(im1, im2, pts, None, winSize=(window_size, window_size), maxLevel=max_level, criteria=criteria ) # Backward flow if fb_threshold > 0: p0r, st0, err = cv2.calcOpticalFlowPyrLK(im2, im1, p1, None, winSize=(window_size, window_size), maxLevel=max_level, criteria=criteria) p0r[st0 == 0] = np.nan # Set only good fb_good = (np.fabs(p0r-p0) < fb_threshold).all(axis=1) p1[~fb_good] = np.nan st = np.bitwise_and(st, st0) err[~fb_good] = np.nan return p1, st, err
def track(self, im0, im1, p0): """ Main tracking method using sparse optical flow (LK) """ if p0 is None or not len(p0): return np.array([]) # Forward flow p1, st1, err1 = cv2.calcOpticalFlowPyrLK(im0, im1, p0, None, **self.lk_params_) p1[st1 == 0] = np.nan if self.fb_check_: # Backward flow p0r, st0, err0 = cv2.calcOpticalFlowPyrLK(im1, im0, p1, None, **self.lk_params_) p0r[st0 == 0] = np.nan # Set only good fb_good = (np.fabs(p0r-p0) < 3).all(axis=1) p1[~fb_good] = np.nan return p1
def feature_tracking(image_ref, image_cur, px_ref): """Feature Tracking Parameters ---------- image_ref : np.array Reference image image_cur : np.array Current image px_ref : Reference pixels Returns ------- (kp1, kp2) : (list of Keypoints, list of Keypoints) """ # Setup win_size = (21, 21) max_level = 3 criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.01) # Perform LK-tracking lk_params = {"winSize": win_size, "maxLevel": max_level, "criteria": criteria} kp2, st, err = cv2.calcOpticalFlowPyrLK(image_ref, image_cur, px_ref, None, **lk_params) # Post-process st = st.reshape(st.shape[0]) kp1 = px_ref[st == 1] kp2 = kp2[st == 1] return kp1, kp2
def update(self,frame,events): if self.active: recent_pupil_positions = events['pupil_positions'] if self.first_img is None: self.first_img = frame.gray.copy() self.detected = False if self.count: gray = frame.gray # in cv2.3 nextPts is falsly required as an argument. nextPts_dummy = self.point.copy() nextPts,status, err = cv2.calcOpticalFlowPyrLK(self.first_img,gray,self.point,nextPts_dummy,winSize=(100,100)) if status[0]: self.detected = True self.point = nextPts self.first_img = gray.copy() nextPts = nextPts[0] self.pos = normalize(nextPts,(gray.shape[1],gray.shape[0]),flip_y=True) self.count -=1 ref = {} ref["screen_pos"] = nextPts ref["norm_pos"] = self.pos ref["timestamp"] = frame.timestamp self.ref_list.append(ref) #always save pupil positions for p_pt in recent_pupil_positions: if p_pt['confidence'] > self.pupil_confidence_threshold: self.pupil_list.append(p_pt) if self.count: self.button.status_text = 'Sampling Gaze Data' else: self.button.status_text = 'Click to Sample at Location'
def process_new_frame(self, new_frame): # convert this frame to grayscale frame_gray = cv2.cvtColor(new_frame, cv2.COLOR_BGR2GRAY) # for first frame, bail fast if self.last_frame_gray is None: self.store_as_last_frame(frame_gray) return # use OpenCV to calculate optical flow new_frame_matched_features, status, error = cv2.calcOpticalFlowPyrLK( self.last_frame_gray, frame_gray, self.last_frame_features, None, **self.lk_params ) self.publish_interframe_motion( self.last_frame_features, new_frame_matched_features, status, error ) # save data for next frame self.store_as_last_frame(frame_gray)
def update(self,frame,events): img = frame.img img_shape = img.shape[:-1][::-1] # width,height succeeding_frame = frame.index-self.prev_frame_idx == 1 same_frame = frame.index == self.prev_frame_idx gray_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) #vars for calcOpticalFlowPyrLK lk_params = dict( winSize = (90, 90), maxLevel = 3, criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 20, 0.03)) updated_past_gaze = [] #lets update past gaze using optical flow: this is like sticking the gaze points onto the pixels of the img. if self.past_gaze_positions and succeeding_frame: past_screen_gaze = np.array([denormalize(ng['norm_pos'] ,img_shape,flip_y=True) for ng in self.past_gaze_positions],dtype=np.float32) new_pts, status, err = cv2.calcOpticalFlowPyrLK(self.prev_gray,gray_img,past_screen_gaze,None,minEigThreshold=0.005,**lk_params) for gaze,new_gaze_pt,s,e in zip(self.past_gaze_positions,new_pts,status,err): if s: # print "norm,updated",gaze['norm_gaze'], normalize(new_gaze_pt,img_shape[:-1],flip_y=True) gaze['norm_pos'] = normalize(new_gaze_pt,img_shape,flip_y=True) updated_past_gaze.append(gaze) # logger.debug("updated gaze") else: # logger.debug("dropping gaze") # Since we will replace self.past_gaze_positions later, # not appedning tu updated_past_gaze is like deliting this data point. pass else: # we must be seeking, do not try to do optical flow, or pausing: see below. pass if same_frame: # paused # re-use last result events['gaze_positions'][:] = self.past_gaze_positions[:] else: # trim gaze that is too old if events['gaze_positions']: now = events['gaze_positions'][0]['timestamp'] cutoff = now-self.timeframe updated_past_gaze = [g for g in updated_past_gaze if g['timestamp']>cutoff] #inject the scan path gaze points into recent_gaze_positions events['gaze_positions'][:] = updated_past_gaze + events['gaze_positions'] events['gaze_positions'].sort(key=lambda x: x['timestamp']) #this may be redundant... #update info for next frame. self.prev_gray = gray_img self.prev_frame_idx = frame.index self.past_gaze_positions = events['gaze_positions']
def track_features(self, image_ref, image_cur): """Track Features Parameters ---------- image_ref : np.array Reference image image_cur : np.array Current image """ # Re-detect new feature points if too few if len(self.tracks_tracking) < self.min_nb_features: self.tracks_tracking = [] # reset alive feature tracks self.detect(image_ref) # LK parameters win_size = (21, 21) max_level = 2 criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 30, 0.03) # Convert reference keypoints to numpy array self.kp_ref = self.last_keypoints() # Perform LK tracking lk_params = {"winSize": win_size, "maxLevel": max_level, "criteria": criteria} self.kp_cur, statuses, err = cv2.calcOpticalFlowPyrLK(image_ref, image_cur, self.kp_ref, None, **lk_params) # Filter out bad matches (choose only good keypoints) status = statuses.reshape(statuses.shape[0]) still_alive = [] for i in range(len(status)): if status[i] == 1: track_id = self.tracks_tracking[i] still_alive.append(track_id) kp = KeyPoint(self.kp_cur[i], 0) self.tracks[track_id].update(self.frame_id, kp) self.tracks_tracking = still_alive
def calculate_sift(self, last_frame, new_frame, last_kp=None): # find corresponding points in the input image and the template image bf = cv2.BFMatcher() matches = bf.knnMatch(self.descs[k], scene_desc, k=2) # Apply Lowe Ratio Test to the keypoints # this should weed out unsure matches good_keypoints = [] for m, n in matches: if m.distance < self.good_thresh * n.distance: good_keypoints.append(m) # put keypoints from template image in template_pts # transform the keypoint data into arrays for homography check # grab precomputed points template_pts = np.float32( [self.kps[k][m.queryIdx].pt for m in good_keypoints] ).reshape(-1, 1, 2) # put corresponding keypoints from input image in scene_img_pts scene_img_pts = np.float32( [scene_kps[m.trainIdx].pt for m in good_keypoints] ).reshape(-1, 1, 2) # if we can't find any matching keypoints, bail # (probably the scene image was nonexistant/real bad) if scene_img_pts.shape[0] == 0: return None # use OpenCV to calculate optical flow new_frame_matched_features, status, error = cv2.calcOpticalFlowPyrLK( self.last_frame_gray, frame_gray, self.last_frame_features, None, **self.lk_params ) self.publish_interframe_motion( self.last_frame_features, new_frame_matched_features, status, error ) # save data for next frame self.store_as_last_frame(frame_gray)