我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用cv2.calcOpticalFlowFarneback()。
def optical_flow(one, two): """ method taken from (https://chatbotslife.com/autonomous-vehicle-speed-estimation-from-dashboard-cam-ca96c24120e4) """ one_g = cv2.cvtColor(one, cv2.COLOR_RGB2GRAY) two_g = cv2.cvtColor(two, cv2.COLOR_RGB2GRAY) hsv = np.zeros((120, 320, 3)) # set saturation hsv[:,:,1] = cv2.cvtColor(two, cv2.COLOR_RGB2HSV)[:,:,1] # obtain dense optical flow paramters flow = cv2.calcOpticalFlowFarneback(one_g, two_g, flow=None, pyr_scale=0.5, levels=1, winsize=15, iterations=2, poly_n=5, poly_sigma=1.1, flags=0) # convert from cartesian to polar mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1]) # hue corresponds to direction hsv[:,:,0] = ang * (180/ np.pi / 2) # value corresponds to magnitude hsv[:,:,2] = cv2.normalize(mag,None,0,255,cv2.NORM_MINMAX) # convert HSV to int32's hsv = np.asarray(hsv, dtype= np.float32) rgb_flow = cv2.cvtColor(hsv,cv2.COLOR_HSV2RGB) return rgb_flow
def readImg(FileList, data_shape): mat = [] tmp = 0 ret = len(FileList)/(NUM_SAMPLES+1) for i in range(ret): for j in range(NUM_SAMPLES): index = i * (NUM_SAMPLES+1) + j img_1 = cv2.imread(FileList[index], 0) img_11 = cv2.resize(img_1, (data_shape[2], data_shape[1])) img_111 = np.multiply(img_11, 1/255.0) img_2 = cv2.imread(FileList[index+1], 0) img_22 = cv2.resize(img_2, (data_shape[2], data_shape[1])) img_222 = np.multiply(img_22, 1/255.0) flow = cv2.calcOpticalFlowFarneback(img_111, img_222, 0.5, 3, 15, 3, 5, 1.2, 0) flow = np.array(flow) flow_1 = flow.transpose((2,1,0)) flow_1 = flow_1.tolist() mat.append(flow_1) return mat
def dense_optical_flow(im1, im2, pyr_scale=0.5, levels=3, winsize=5, iterations=3, poly_n=5, poly_sigma=1.2, fb_threshold=-1, mask1=None, mask2=None, flow1=None, flow2=None): if flow1 is None: fflow = cv2.calcOpticalFlowFarneback(to_gray(im1), to_gray(im2), pyr_scale, levels, winsize, iterations, poly_n, poly_sigma, 0) else: fflow = cv2.calcOpticalFlowFarneback(to_gray(im1), to_gray(im2), pyr_scale, levels, winsize, iterations, poly_n, poly_sigma, 0, flow1.copy()) if mask1 is not None: fflow[~mask1.astype(np.bool)] = np.nan if fb_threshold > 0: H, W = im1.shape[:2] xs, ys = np.meshgrid(np.arange(W), np.arange(H)) xys1 = np.dstack([xs, ys]) xys2 = xys1 + fflow rflow = dense_optical_flow(im2, im1, pyr_scale=pyr_scale, levels=levels, winsize=winsize, iterations=iterations, poly_n=poly_n, poly_sigma=poly_sigma, fb_threshold=-1) if mask2 is not None: rflow[~mask2.astype(np.bool)] = np.nan xys1r = xys2 + rflow fb_bad = (np.fabs(xys1r - xys1) > fb_threshold).all(axis=2) fflow[fb_bad] = np.nan return fflow
def optical_flow(one, two): """ method taken from https://chatbotslife.com/autonomous-vehicle-speed-estimation-from-dashboard-cam-ca96c24120e4 input: image_current, image_next (RGB images) calculates optical flow magnitude and angle and places it into HSV image """ one_g = cv2.cvtColor(one, cv2.COLOR_RGB2GRAY) two_g = cv2.cvtColor(two, cv2.COLOR_RGB2GRAY) hsv = np.zeros((120, 320, 3)) # set saturation hsv[:,:,1] = cv2.cvtColor(two, cv2.COLOR_RGB2HSV)[:,:,1] # obtain dense optical flow paramters flow = cv2.calcOpticalFlowFarneback(one_g, two_g, flow=None, pyr_scale=0.5, levels=1, winsize=10, iterations=2, poly_n=5, poly_sigma=1.1, flags=0) # convert from cartesian to polar mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1]) # hue corresponds to direction hsv[:,:,0] = ang * (180/ np.pi / 2) # value corresponds to magnitude hsv[:,:,2] = cv2.normalize(mag,None,0,255,cv2.NORM_MINMAX) # convert HSV to int32's hsv = np.asarray(hsv, dtype= np.float32) rgb_flow = cv2.cvtColor(hsv,cv2.COLOR_HSV2RGB) return rgb_flow
def calculate_flow(self, frame_a, frame_b): previous_frame = cv2.cvtColor(frame_a, cv2.COLOR_BGR2GRAY) next_frame = cv2.cvtColor(frame_b, cv2.COLOR_BGR2GRAY) flow = cv2.calcOpticalFlowFarneback( previous_frame, next_frame, None, 0.5, 3, 15, 3, 5, 1.2, 0 ) # Change here horz = cv2.normalize(flow[..., 0], None, 0, 255, cv2.NORM_MINMAX) vert = cv2.normalize(flow[..., 1], None, 0, 255, cv2.NORM_MINMAX) horz = horz.astype('uint8') vert = vert.astype('uint8') # Change here too cv2.imshow('Horizontal Component', horz) cv2.imshow('Vertical Component', vert) k = cv2.waitKey(0) & 0xff if k == ord('s'): # Change here cv2.imwrite('opticalflow_horz.pgm', horz) cv2.imwrite('opticalflow_vert.pgm', vert) cv2.destroyAllWindows()
def extract_optical_flow(fn, times, frames=8, scale_factor=1.0): cap = cv2.VideoCapture(fn) n_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT) outputs = [] if n_frames < frames * 2: return outputs def resize(im): if scale_factor != 1.0: new_size = (int(im.shape[1] * scale_factor), int(im.shape[0] * scale_factor)) return cv2.resize(im, new_size, interpolation=cv2.INTER_LINEAR) else: return im for t in times: cap.set(cv2.CAP_PROP_POS_FRAMES, min(t * n_frames, n_frames - 1 - frames)) ret, frame0 = cap.read() im0 = resize(cv2.cvtColor(frame0, cv2.COLOR_BGR2GRAY)) mags = [] middle_frame = frame0 for f in range(frames - 1): ret, frame1 = cap.read() if f == frames // 2: middle_frame = frame1 im1 = resize(cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)) flow = cv2.calcOpticalFlowFarneback(im0, im1, None, 0.5, 3, 15, 3, 5, 1.2, 0) mag, ang = cv2.cartToPolar(flow[...,0], flow[...,1]) mags.append(mag) im0 = im1 mag = np.sum(mags, 0) mag = mag.clip(min=0) norm_mag = (mag - mag.min()) / (mag.max() - mag.min() + 1e-5) x = middle_frame[..., ::-1].astype(np.float32) / 255 outputs.append((x, norm_mag)) return outputs
def extract_optical_flow(fn, n_frames=34): img = dd.image.load(fn) if img.shape != (128*34, 128, 3): return [] frames = np.array_split(img, 34, axis=0) grayscale_frames = [fr.mean(-1) for fr in frames] mags = [] skip_frames = np.random.randint(34 - n_frames + 1) middle_frame = frames[np.random.randint(skip_frames, skip_frames+n_frames)] im0 = grayscale_frames[skip_frames] for f in range(1+skip_frames, 1+skip_frames+n_frames-1): im1 = grayscale_frames[f] flow = cv2.calcOpticalFlowFarneback(im0, im1, None, # flow 0.5, # pyr_scale 3, # levels np.random.randint(3, 20), # winsize 3, #iterations 5, #poly_n 1.2, #poly_sigma 0 # flags ) mag, ang = cv2.cartToPolar(flow[...,0], flow[...,1]) mags.append(mag) im0 = im1 mag = np.sum(mags, 0) mag = mag.clip(min=0) #norm_mag = np.tanh(mag * 10000) norm_mag = (mag - mag.min()) / (mag.max() - mag.min() + 1e-5) outputs = [] outputs.append((middle_frame, norm_mag)) return outputs
def _calc_optical_flow(prev, next_): flow = cv2.calcOpticalFlowFarneback(prev, next_, flow=None, pyr_scale=0.5, levels=3, winsize=15, iterations=3, poly_n=5, poly_sigma=1.2, flags=0) return flow
def test_flow(img1, img2): gray1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) gray2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) flow = cv2.calcOpticalFlowFarneback(gray1, gray2, 0.5, 3, 15, 3, 5, 1.2, 0) cv2.imshow('flow', draw_flow(gray1, flow)) cv2.imshow('flow HSV', draw_hsv(flow)) cv2.imshow('warp', warp_flow(cur_glitch, flow))
def track(self, im0, im1, p0): if p0 is None or not len(p0): return np.array([]) fflow = cv2.calcOpticalFlowFarneback(im0, im1, **self.farneback_params_) fflow = cv2.medianBlur(fflow, 5) # Initialize forward flow and propagated points p1 = np.ones(shape=p0.shape) * np.nan flow_p0 = np.ones(shape=p0.shape) * np.nan flow_good = np.ones(shape=p0.shape, dtype=bool) # Check finite value for pts, and within image bounds valid0 = finite_and_within_bounds(p0, im0.shape) # Determine finite flow at points xys0 = p0[valid0].astype(int) flow_p0[valid0] = fflow[xys0[:,1], xys0[:,0]] # Propagate p1 = p0 + flow_p0 # FWD-BWD check if self.fb_check_: # Initialize reverse flow and propagated points p0r = np.ones(shape=p0.shape) * np.nan flow_p1 = np.ones(shape=p0.shape) * np.nan rflow = cv2.calcOpticalFlowFarneback(im1, im0, **self.farneback_params_) rflow = cv2.medianBlur(rflow, 5) # Check finite value for pts, and within image bounds valid1 = finite_and_within_bounds(p1, im0.shape) # Determine finite flow at points xys1 = p1[valid1].astype(int) flow_p1[valid1] = rflow[xys1[:,1], xys1[:,0]] # Check diff p0r = p1 + flow_p1 fb_good = (np.fabs(p0r-p0) < 3).all(axis=1) # Set only good flow flow_p0[~fb_good] = np.nan p1 = p0 + flow_p0 return p1
def calculateOpticalFlowsForDataset(self, image_reference, dataset): # optical flows will be stored here list_optical_flows = [] # add zero optical flow for the reference image which is at first position shape_image_reference = image_reference.shape shape_optical_flow = [shape_image_reference[0], shape_image_reference[1], 2] zero_optical_flow = np.zeros(shape_optical_flow, np.float32) list_optical_flows.append(zero_optical_flow) # iterate through the dataset and calculate the optical flow for each # except the first one num_images = dataset.getImageCount() for index in range(1, num_images): print ("calculating optical flow for image ", index) # Get the image at the index data = dataset.getData(index) hdu_image = data["hdu_list"][self.extension].data image = CommonFunctions.preprocessHduImage(hdu_image, self.scale_factor) # @todo: here, do not use config but simply check if matrix is None # apply the transformation to the input image if self.config["Processing_Options"]["align_images"] == "True": # get the image dimension image_shape = image.shape # Transform the Image image = cv2.warpAffine(image, data["transform_matrix"], (image_shape[1],image_shape[0]), flags=cv2.INTER_CUBIC + cv2.WARP_INVERSE_MAP) # calculate the optical flow (backwards for warping!) optical_flow = cv2.calcOpticalFlowFarneback(image_reference, image, None, self.pyr_scale, self.levels, self.winsize, self.iterations, self.poly_n, self.poly_sigma, cv2.OPTFLOW_FARNEBACK_GAUSSIAN) # Write out optical flow images for user evaluation self.writeOpticalFlowImage(index, optical_flow) list_optical_flows.append(optical_flow) return list_optical_flows ## Average a list of optical flows # @param list_optical_flows list object containing optical flows as numpy arrays # @return averaged optical flow as numpy array
def extract_optical_flow(fn, times, frames=8, scale_factor=1.0): cap = cv2.VideoCapture(fn) if not cap.isOpened(): return [] n_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT) outputs = [] if n_frames < frames * 2: return outputs def resize(im): if scale_factor != 1.0: new_size = (int(im.shape[1] * scale_factor), int(im.shape[0] * scale_factor)) return cv2.resize(im, new_size, interpolation=cv2.INTER_LINEAR) else: return im for t in times: cap.set(cv2.CAP_PROP_POS_FRAMES, min(t * n_frames, n_frames - 1 - frames)) ret, frame0 = cap.read() im0 = resize(cv2.cvtColor(frame0, cv2.COLOR_BGR2GRAY)) mags = [] middle_frame = frame0 flows = [] for f in range(frames - 1): ret, frame1 = cap.read() if f == frames // 2: middle_frame = frame1 im1 = resize(cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)) flow = cv2.calcOpticalFlowFarneback(im0, im1, None, 0.5, # py_scale 8, # levels int(40 * scale_factor), # winsize 10, # iterations 5, # poly_n 1.1, # poly_sigma cv2.OPTFLOW_FARNEBACK_GAUSSIAN) #mag, ang = cv2.cartToPolar(flow[...,0], flow[...,1]) #mags.append(mag) flows.append(flow) im0 = im1 flow = (np.mean(flows, 0) / 100).clip(-1, 1) #flow = np.mean(flows, 0) #flow /= (flow.mean() * 5 + 1e-5) #flow = flow.clip(-1, 1) #flows = flows / (np.mean(flows, 0, keepdims=True) + 1e-5) x = middle_frame[..., ::-1].astype(np.float32) / 255 outputs.append((x, flow)) return outputs
def writeOpticalFlow(path,filename,w,h,c): count=0 try: cap = cv2.VideoCapture(path+'/'+filename) ret, frame1 = cap.read() if frame1==None: return count frame1 = cv2.resize(frame1, (w,h)) prvs = cv2.cvtColor(frame1,cv2.COLOR_BGR2GRAY) if not os.path.isdir("../dataset/of_images"): os.mkdir("of_images") folder = '../dataset/of_images'+'/'+filename+'/' dir = os.path.dirname(folder) os.mkdir(dir) while(1): ret, frame2 = cap.read() if frame2 is None: break count+=1 if count%5==0: print (filename+':' +str(c)+'-'+str(count)) frame2 = cv2.resize(frame2, (w,h)) next = cv2.cvtColor(frame2,cv2.COLOR_BGR2GRAY) flow = cv2.calcOpticalFlowFarneback(prvs,next, None, 0.5, 3, 15, 3, 5, 1.2, 0) horz = cv2.normalize(flow[...,0], None, 0, 255, cv2.NORM_MINMAX) vert = cv2.normalize(flow[...,1], None, 0, 255, cv2.NORM_MINMAX) horz = horz.astype('uint8') vert = vert.astype('uint8') cv2.imwrite(folder+'h'+str(count)+'_'+filename+'.jpg',horz,[int(cv2.IMWRITE_JPEG_QUALITY), 90]) cv2.imwrite(folder+'v'+str(count)+'_'+filename+'.jpg',vert,[int(cv2.IMWRITE_JPEG_QUALITY), 90]) prvs = next cap.release() cv2.destroyAllWindows() return count except Exception as e: return count
def writeOpticalFlow(path,filename,w,h,c): count=0 try: cap = cv2.VideoCapture(path+'/'+filename) ret, frame1 = cap.read() if frame1==None: return count frame1 = cv2.resize(frame1, (w,h)) prvs = cv2.cvtColor(frame1,cv2.COLOR_BGR2GRAY) folder = './of_images'+'/'+filename+'/' dir = os.path.dirname(folder) os.mkdir(dir) while(1): ret, frame2 = cap.read() if frame2==None: break count+=1 if count%5==0: print (filename+':' +str(c)+'-'+str(count)) frame2 = cv2.resize(frame2, (w,h)) next = cv2.cvtColor(frame2,cv2.COLOR_BGR2GRAY) flow = cv2.calcOpticalFlowFarneback(prvs,next, None, 0.5, 3, 15, 3, 5, 1.2, 0) horz = cv2.normalize(flow[...,0], None, 0, 255, cv2.NORM_MINMAX) vert = cv2.normalize(flow[...,1], None, 0, 255, cv2.NORM_MINMAX) horz = horz.astype('uint8') vert = vert.astype('uint8') cv2.imwrite(folder+'h'+str(count)+'_'+filename+'.jpg',horz,[int(cv2.IMWRITE_JPEG_QUALITY), 90]) cv2.imwrite(folder+'v'+str(count)+'_'+filename+'.jpg',vert,[int(cv2.IMWRITE_JPEG_QUALITY), 90]) prvs = next cap.release() cv2.destroyAllWindows() return count except Exception,e: print e return count