我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用cv2.imencode()。
def do_tarfile_save(self, tf): """ Write images and calibration solution to a tarfile object """ def taradd(name, buf): s = StringIO(buf) ti = tarfile.TarInfo(name) ti.size = len(s.getvalue()) ti.uname = 'calibrator' ti.mtime = int(time.time()) tf.addfile(tarinfo=ti, fileobj=s) ims = [("left-%04d.png" % i, im) for i,(_, im) in enumerate(self.db)] for (name, im) in ims: taradd(name, cv2.imencode(".png", im)[1].tostring()) if self.calibrated: taradd('ost.yaml', self.yaml()) taradd('ost.txt', self.ost()) else: print("Doing none-calibration tarfile save!")
def capture(self, opened_cameras): self.opened_cameras = opened_cameras while 1: try: ret, frame = self.connection.read() data = cv2.imencode('.jpg', frame)[1].tostring() if len(self.socket): for c in self.socket: self.send(c,data) else: self.connection.release() del self.opened_cameras[self.connections[1]] exit(0) # self.connections[1].close() except KeyboardInterrupt: self.signal_handler()
def write(): os.remove(filename) cap = cv2.VideoCapture(0) db = shelve.open(filename) imgs = [] data = range(100) for i in range(100): ret, frame = cap.read() if ret: # jpg = frame # 29 MB # jpg = cv2.imencode('.jpg', frame) # make much smaller (1.9MB), otherwise 29MB jpg = cv2.imencode('.jpg', frame)[1].tostring() # no bennefit with doing string (1.9MB) imgs.append(jpg) print('frame[{}] {}'.format(i, frame.shape)) time.sleep(0.03) db['imgs'] = imgs db['data'] = data cap.release() db.close()
def interactive_save(image): img_str = cv2.imencode('.png', image)[1].tostring() imgpil = Image.open(StringIO(img_str)) root = Tkinter.Tk() root.geometry('{}x{}'.format(400, 400)) imgtk = ImageTk.PhotoImage(image=imgpil) panel = Tkinter.Label(root, image=imgtk) #.pack() panel.pack(side="bottom", fill="both", expand="yes") Tkinter.Button(root, text="Hello!").pack() save_to = tkSimpleDialog.askstring("Save cropped image", "Enter filename") if save_to: if save_to.find('.') == -1: save_to += '.png' print 'Save to:', save_to cv2.imwrite(save_to, image) root.destroy()
def cv2_imshow(image, name='name'): """Display an OpenCV image (2D NumPy array) through the Udacity IDE.""" global max_images, num_images if num_images >= max_images: print("WARNING: Exceeded max no. of imshow() calls ({}), no more images will be shown.".format(max_images)) return num_images += 1 ext = 'png' # encodedImage = cv2.cv.EncodeImage('.' + ext, cv2.cv.fromarray(image)) # OpenCV 2.3.x # bytes = encodedImage.tostring() retval, bytes = cv2.imencode('.' + ext, image) # OpenCV 2.4.x output_image(name, ext, bytes)
def start(self): """ Create stream object. :return: stream """ if self.protocol is "image": image = cv2.imread(self.ip_address, 1) plate = self.analize_plate.proccess( cv2.imencode('.jpg', image)[1].tostring()) if plate: print plate['results'] else: stream = cv2.VideoCapture(self.url) self.proccess(stream) # return stream
def write_log(self, results): """Process results Args: results: y_out, s_out """ inp = results['_batches'][0] y_out = results['y_out'] s_out = results['s_out'] with h5py.File(self.dataset.h5_fname, 'r+') as h5f: print inp['idx_map'] for ii in xrange(y_out.shape[0]): idx = inp['idx_map'][ii] group = h5f[self.dataset.get_str_id(idx)] if 'instance_pred' in group: del group['instance_pred'] for ins in xrange(y_out.shape[1]): y_out_arr = y_out[ii, ins] y_out_arr = (y_out_arr * 255).astype('uint8') y_out_str = cv2.imencode('.png', y_out_arr)[1] group['instance_pred/{:02d}'.format(ins)] = y_out_str if 'score_pred' in group: del group['score_pred'] group['score_pred'] = s_out[ii]
def save_seg(self, seg_id, seg, group): seg_str = cv2.imencode(".png", seg)[1] key = "label_ins_seg/{:03d}".format(seg_id) self.save(key, seg_str, group) pass
def send_stereo_camera(self): # Black and white image is about 225K # That should consume about 2 seconds worth of bandwidth; hopefully be okay self.cloud = self.zarj.eyes.get_stereo_cloud() img, self.img_details = self.zarj.eyes.get_cloud_image_with_details(self.cloud) gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) (_, png) = cv2.imencode(".png", gray) picturemsg = ZarjPicture("Image of satellite hands", png, True) picturemsg.time = rospy.get_time() self.points = [ None, None ] self.zarj_comm.push_message(picturemsg)
def get_data(self): def encoder(img): return np.asarray(bytearray(cv2.imencode('.jpg', img)[1].tostring()), dtype=np.uint8) video_list = get_video(self.pattern, passes=10000, rng=self.rng) frame_list = get_random_sharp_frames(video_list, window_size=self.window_size, number_of_picked_frames=30, rng=self.rng) for b, s in get_good_patches(frame_list, number_of_picked_patches=10, rng=self.rng): values = [] for i in range(self.window_size): b_enc = encoder(b[i]) values.append(b_enc) for i in range(self.window_size): s_enc = encoder(s[i]) values.append(s_enc) yield values self.nr_examples -= 1 if self.nr_examples == 0: break
def run(self): global lastFrame global lockFrame # This method runs in a separate thread while not self.terminated: # Wait for an image to be written to the stream if self.event.wait(1): try: # Read the image and save globally self.stream.seek(0) flippedArray = cv2.flip(self.stream.array, -1) # Flips X and Y retval, thisFrame = cv2.imencode('.jpg', flippedArray) del flippedArray lockFrame.acquire() lastFrame = thisFrame lockFrame.release() finally: # Reset the stream and event self.stream.seek(0) self.stream.truncate() self.event.clear() # Image capture thread
def video_emitter(video): # Open the video video = cv2.VideoCapture(video) print(' emitting.....') # read the file while (video.isOpened): # read the image in each frame success, image = video.read() # check if the file has read the end if not success: break # convert the image png ret, jpeg = cv2.imencode('.png', image) # Convert the image to bytes and send to kafka producer.send_messages(topic, jpeg.tobytes()) # To reduce CPU usage create sleep time of 0.2sec time.sleep(0.2) # clear the capture video.release() print('done emitting')
def liveVideoServer(cam_liveWeb_frame_Queue): app = Flask(__name__) def gen(): while True: frame = cam_liveWeb_frame_Queue.get() frame = writeToLiveFrame(frame, RPiName) image = cv2.imencode('.jpg', frame)[1].tostring() yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + image + b'\r\n') @app.route("/liveVideoFeed") def liveFeed(): return Response(gen(), mimetype='multipart/x-mixed-replace; boundary=frame') app.run(host='0.0.0.0')
def gen(index): while True: t = time.time() if t - times[index] > 0.03: # print 1/(t-times[index]) times[index] = t camera = cameras[cameraStrings[index]] frame = None if camera == None or cameraStrings[index] == "Off": yield (b'--frame\r\n'b'Content-Type: image/png\r\n\r\n' + logo + b'\r\n') else: frame = cameras[cameraStrings[index]].read() frame,data,ret,mask = visionFiles[index].calculateFrame(cameras[cameraStrings[index]]) if len(data) > 0: socket.send(data) if masks[index]: frame = mask frame = cv2.imencode('.jpg',frame,[int(IMWRITE_JPEG_QUALITY),50])[1].tostring() yield (b'--frame\r\n'b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') #default route
def write_lmdb(db_path, list_filename, height, width): map_size = 9999999999 db = lmdb.open(db_path, map_size=map_size) writer = db.begin(write=True) datum = caffe.proto.caffe_pb2.Datum() for index, line in enumerate(open(list_filename, 'r')): img_filename, label = line.strip().split(' ') img = cv2.imread(img_filename, 1) img = cv2.resize(img, (height, width)) _, img_jpg = cv2.imencode('.jpg', img) datum.channels = 3 datum.height = height datum.width = width datum.label = int(label) datum.encoded = True datum.data = img_jpg.tostring() datum_byte = datum.SerializeToString() index_byte = '%010d' % index writer.put(index_byte, datum_byte, append=True) writer.commit() db.close()
def gen_normal(): while 1: if len(latest_video_frame) > 0: # if we have started receiving actual frames: # convert the latest read video frame to jpg format: ret, jpg = cv2.imencode(".jpg", latest_video_frame) # get the raw data bytes of the jpg image: (convert to binary) frame = jpg.tobytes() # yield ('return') the frame: (yield: returns value and saves the current state of the generator function, the next time this generator function is called execution will resume on the next line of code in the function (ie it will in this example start a new cycle of the while loop and yield a new frame)) # what we yield looks like this, but in binary: (binary data is a must for multipart) # --frame # Content-Type: image/jpeg # # <frame data> # yield (b'--frame\nContent-Type: image/jpeg\n\n' + frame + b'\n')
def binary_encoder(data): ## Input ## data: dict ## 'id': sample ID(usually name of image file) ## 'image': path to image file ## 'objects': dict ## 'bbox': bounding box coordinate of object ## 'label': label of object ## Output ## string: encoded list ## [id, encoded_image, labels, nbboxes] ## encode image image = cv2.imread(data['image']) _, encoded_image = cv2.imencode('.jpg', image) data['image'] = encoded_image return pickle.dumps(data, pickle.HIGHEST_PROTOCOL)
def _returnfaces(self, request): ''' returnes the processed images with the detected artifacts highlighted ''' try: image = yield from self._getlastimage() image_buf = cv2.imencode('.jpg', image)[1] image_str = np.array(image_buf).tostring() except asyncio.QueueEmpty as qe: msg = 'QueueEmpty exception has been thrown. There is no image ' \ 'with some recognized artifacts in the queue right now.' self._logger.warning(msg) return Response( text=msg, status=500, content_type='application/json' ) return Response( body=image_str, status=200, content_type='image/jpeg' )
def gen2(): # from PIL import Image # import io import cv2 #im = Image.fromarray(A) while True: frame = get_frame() frame = cv2.imencode('.jpg', frame)[1].tostring() # print type(frame),frame # frame = Image.fromarray(frame) # frame = frame.tostring() # print(type(frame), frame) # yield (frame) yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
def encode(self, frame): return cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, self.quality])[1]
def publish_image_t(pub_channel, im, jpeg=False, flip_rb=True): global g_viz_pub out = image_t() # Populate appropriate fields h,w = im.shape[:2] c = 3 out.width, out.height = w, h out.row_stride = w*c out.utime = 1 # Propagate encoded/raw data, image = cv2.cvtColor(im, cv2.COLOR_GRAY2BGR) if im.ndim == 2 else im if flip_rb and im.ndim == 3: rarr, barr = image[:,:,2].copy(), image[:,:,0].copy() image[:,:,0], image[:,:,2] = rarr, barr # Propagate appropriate encoding if jpeg: out.pixelformat = image_t.PIXEL_FORMAT_MJPEG else: out.pixelformat = image_t.PIXEL_FORMAT_RGB out.data = cv2.imencode('.jpg', image)[1] if jpeg else image.tostring() out.size = len(out.data) out.nmetadata = 0 # Pub g_viz_pub.lc.publish(pub_channel, out.encode())
def send_image(s, im, scale=1.0, encode_param=[int(cv2.IMWRITE_JPEG_QUALITY),90]): # Will not upsample image for bandwidth reasons if scale < 1: im = cv2.resize(im, None, fx=scale, fy=scale) result, imgencode = cv2.imencode('.jpg', im, encode_param) data = np.array(imgencode) stringData = data.tostring() s.send( str(len(stringData)).ljust(16)) s.send( stringData )
def update_socket(ws): print 'websocket connection request' state['draw_output'] = True while not ws.closed: new_data_condition.acquire() new_data_condition.wait() new_data_condition.release() result = { 'targets': state['targets'], 'fps': state['fps'], 'connected': state['ack'] } _, binframe = cv2.imencode('.jpg', state['output_images']['bin']) result['binaryImg'] = base64.b64encode(binframe) _, binframe = cv2.imencode('.jpg', state['output_images']['result']) result['resultImg'] = base64.b64encode(binframe) message = json.dumps(result) ws.send(message) received = json.loads(ws.receive()) if 'thresholds' in received: config['target'] = received['thresholds'] save_config(config) if 'camera' in received: config['camera'] = received['camera'] save_config(config) print 'websocket disconnected' state['draw_output'] = False
def save(self,output_path,output_im): ''' ???? ''' cv2.imencode('.jpg',output_im)[1].tofile(output_path) # ============================================================================= # cv2.imwrite(os.path.abspath(output_path.encode('utf-8').decode('gbk')), output_im) # =============================================================================
def do_tarfile_save(self, tf): """ Write images and calibration solution to a tarfile object """ ims = ([("left-%04d.png" % i, im) for i,(_, im, _) in enumerate(self.db)] + [("right-%04d.png" % i, im) for i,(_, _, im) in enumerate(self.db)]) def taradd(name, buf): s = StringIO(buf) ti = tarfile.TarInfo(name) ti.size = len(s.getvalue()) ti.uname = 'calibrator' ti.mtime = int(time.time()) tf.addfile(tarinfo=ti, fileobj=s) for (name, im) in ims: taradd(name, cv2.imencode(".png", im)[1].tostring()) taradd('left.yaml', self.yaml("/left", self.l)) taradd('right.yaml', self.yaml("/right", self.r)) taradd('ost.txt', self.ost())
def passzbar(image): # convert to bmp binary so that zbar can handle it retval, buf = cv2.imencode('.bmp', image) if retval == False: raise ValueError('The Given image could not be converted to BMP binary data') # convert buf from numpy.ndarray to bytes binbmp = buf.tostring() optionargs = [] args = [ ZBARIMG, ':-', '-q' ] + optionargs p = subprocess.Popen( args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False ) stdout, stderr = p.communicate(input=binbmp) if len(stderr) == 0: bindata = stdout else: raise RuntimeError('ZBar threw error:\n' + stderr.decode('utf-8')) t = bindata.split(b":", 1) #print(t) type = None data = None if len(t) == 2: type, data = t return type, data
def passpotrace(image, optionargs=[]): # potrace supports only pnm (pbm, pgm, ppm), bmp # and cv2.imencode() supports all of them. # convert to bmp binary so that potrace can handle it retval, buf = cv2.imencode('.bmp', image) if retval == False: raise ValueError('The Given image could not be converted to BMP binary data') # convert buf from numpy.ndarray to bytes binbmp = buf.tostring() #optionargs = [] args = [ POTRACE, '-', '-o-', '--svg' ] + optionargs p = subprocess.Popen( args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False ) stdout, stderr = p.communicate(input=binbmp) if len(stderr) == 0: binsvg = stdout else: raise RuntimeError('Potrace threw error:\n' + stderr.decode('utf-8')) return binsvg
def get_face_properties(self, images, timeout): """ Returns a SkyFace detections list based on a list of images :param images: List of input images (Faces) :param timeout: Request timeout :return: The SkyFaces with their properties """ buffers = [cv2.imencode('.jpg', image)[1].tostring() for image in images] try: response = self._external_request_with_timeout(buffers, timeout) except Exception as e: raise Exception("Skybiometry API call failed:", e) if not "photos" in response: raise Exception("Skybiometry API call, 'photos' not found in response:", response) photos = response["photos"] if len(photos) != len(buffers): raise Exception("Skybiometry API call, result length != images length:", response) fps = [] for photo in photos: attrs = photo["tags"][0]["attributes"] fp = SkyFaceProperties() for name, attr in attrs.iteritems(): if hasattr(fp, name): setattr(fp, name, Attribute(attr["value"], attr["confidence"] / 100.0)) fps.append(fp) return fps
def to_pillow(image): return Image.fromarray(image[:, :, ::-1].copy()) # There is another way # img_bytes = cv2.imencode('.png', image)[1].tostring() # return Image.open(BytesIO(img_bytes))
def image_to_file(image): f = StringIO() ret, buf = cv2.imencode('.jpg', image) f.write(np.array(buf).tostring()) f.seek(0) return f
def videoToFrame(videoName): print(videoName) cap = cv2.VideoCapture(videoName) flag, frame = cap.read() global frameCnt skipCnt = 0 width, height = np.shape(frame)[1], np.shape(frame)[0] while flag: for i in range(segXNum): for j in range(segYNum): frame2 = frame[j * height // segYNum : (j + 1) * height // segYNum, i * width // segXNum : (i + 1) * width // segXNum] #get ROI #???????????? cv2.imencode('.jpg', frame2)[1].tofile("E:\\traffic\\?????\\???\\image\\" + '{:0>6}'.format(str(frameCnt + 1)) + '.jpg') #cv2.imwrite("E:\\traffic\\?????\\???\\image\\" + '{:0>6}'.format(str(frameCnt // skipFrameNum + 1)) + '.jpg', frame) frameCnt += 1 skipCnt += 1 cap.set(1, skipCnt * skipFrameNum) flag, frame = cap.read() return
def resizing(img): height, width, channels = img.shape if max(height, width) > 100: ratio = float(height) / width new_width = 100 / ratio img_resized = cv2.resize(img, (int(new_width), 100)) ip_convert = cv2.imencode('.png', img_resized) else: ip_convert = cv2.imencode('.png', img) return ip_convert
def removebg(segmented_img): src = cv2.imdecode(np.squeeze(np.asarray(segmented_img[1])), 1) tmp = cv2.cvtColor(src, cv2.COLOR_BGR2GRAY) _, alpha = cv2.threshold(tmp, 0, 255, cv2.THRESH_BINARY) b, g, r = cv2.split(src) rgba = [b, g, r, alpha] dst = cv2.merge(rgba, 4) processed_img = cv2.imencode('.png', dst) return processed_img
def to_pillow(image): return Image.fromarray(image[:, :, ::-1].copy()) # There is another way # img_bytes = cv2.imencode('.png', image)[1].tostring() # return Image.open(StringIO(img_bytes))
def save_inp_image(self, img, group): img_str = cv2.imencode('.png', img)[1] self.save('input', img_str, group)
def save_full_image(self, img, group): img_str = cv2.imencode('.png', img)[1] self.save('input_full_size', img_str, group)
def save_seg(self, seg_id, seg, group): seg_str = cv2.imencode('.png', seg)[1] key = 'label_segmentation/{:02d}'.format(seg_id) self.save(key, seg_str, group)
def save_ori(self, ori, group): ori_str = cv2.imencode('.png', ori)[1] self.save('orientation', ori_str, group)
def save_full_seg(self, seg_id, seg, group): seg_str = cv2.imencode('.png', seg)[1] key = 'label_segmentation_full_size/{:02d}'.format(seg_id) self.save(key, seg_str, group)
def save_full_sem_seg(self, cls_id, seg, group): seg_str = cv2.imencode('.png', seg)[1] key = 'label_semantic_segmentation_full_size/{:02d}'.format(cls_id) self.save(key, seg_str, group)
def write_log(self, results): """Process results Args: results: y_out, s_out """ inp = self._batch y_out = results['y_out'] d_out = results['d_out'] with h5py.File(self.dataset.h5_fname, 'r+') as h5f: for ii in xrange(y_out.shape[0]): idx = inp['idx_map'][ii] group = h5f[self.dataset.get_str_id(idx)] if 'foreground_pred' in group: del group['foreground_pred'] if 'orientation_pred' in group: del group['orientation_pred'] for cl in range(y_out.shape[3]): y_out_arr = y_out[ii, :, :, cl] y_out_arr = (y_out_arr * 255).astype('uint8') y_out_str = cv2.imencode('.png', y_out_arr)[1] group['foreground_pred/{:02d}'.format(cl)] = y_out_str for ch in range(d_out.shape[3]): d_out_arr = d_out[ii, :, :, ch] d_out_arr = (d_out_arr * 255).astype('uint8') d_out_str = cv2.imencode('.png', d_out_arr)[1] group['orientation_pred/{:02d}'.format(ch)] = d_out_str
def _captureSingleImage(self): Return, Frame = self._VideoStream.read() FrameImage = cv2.cvtColor(Frame, cv2.COLOR_BGR2RGBA) FrameImage = cv2.resize(FrameImage, self._getLabelSize(self._VideoLabel, 480/320), interpolation = cv2.INTER_CUBIC) ArrayImage = image.fromarray(FrameImage) #ArrayImage = ArrayImage.resize(, image.ANTIALIAS) Image = imagetk.PhotoImage(image=ArrayImage) self._VideoLabel.imgtk = Image self._VideoLabel.configure(image=Image) #cv2.imencode(".jpg", Frame) return(cv2.imencode('.jpg', Frame)[1].tostring())
def save_inp_image(self, img, group): img_str = cv2.imencode(".png", img)[1] self.save("input", img_str, group) pass
def save_full_image(self, img, group): img_str = cv2.imencode(".png", img)[1] self.save("input_full", img_str, group)
def save_ori(self, ori, group): ori_str = cv2.imencode(".png", ori)[1] self.save("label_angle", ori_str, group) pass
def save_full_seg(self, seg_id, seg, group): seg_str = cv2.imencode(".png", seg)[1] key = "label_ins_seg_full/{:03d}".format(seg_id) self.save(key, seg_str, group) pass
def save_full_sem_seg(self, cls_id, seg, group): seg_str = cv2.imencode(".png", seg)[1] key = "label_sem_seg_full/{:03d}".format(cls_id) self.save(key, seg_str, group) pass
def current_img_b64(): b64 = None frame = current_img() if frame != None: png = cv2.imencode('.png', frame)[1] b64 = base64.encodestring(png) return b64 # GET /see # returns the current image
def _convert_jpg(self, image_msg): retval, buf = imencode('.jpg', image_msg) return buf.tostring()