我们从Python开源项目中,提取了以下42个代码示例,用于说明如何使用picamera.array()。
def __init__(self, res_width=96, res_height=96): self.camera = picamera.PiCamera(resolution=(res_width, res_height)) # TODO propagate configurable resolution through '96' logic below self.camera.hflip = True self.camera.vflip = True self.res_width = res_width self.res_height = res_height self.stream = picamera.array.PiYUVArray(self.camera) self.pixelObjList = [] self.object_id_center = 0 self.pixelObjList.append(PixelObject(self.next_obj_id())) self.max_pixel_count = 0 self.largest_object_id = 0 self.largest_X = 0 self.largest_Y = 0 self.filename = ''
def run(self): # This method runs in a separate thread global processingPool global lock while not self.terminated: # Wait for an image to be written to the stream if self.event.wait(1): try: # Read the image and do some processing on it self.stream.seek(0) self.ProcessImage(self.stream.array) finally: # Reset the stream and event self.stream.seek(0) self.stream.truncate() self.event.clear() # Return ourselves to the processing pool with lock: processingPool.append(self) # Image processing function
def run(self): # This method runs in a separate thread while not self.terminated: # Wait for an image to be written to the stream if self.event.wait(1): try: # Read the image and do some processing on it self.stream.seek(0) self.ProcessImage(self.stream.array) finally: # Reset the stream and event self.stream.seek(0) self.stream.truncate() self.event.clear() # Image processing function
def bytes_to_yuv(data, resolution): """ Converts a bytes object containing YUV data to a `numpy`_ array. """ width, height = resolution fwidth, fheight = raw_resolution(resolution) y_len = fwidth * fheight uv_len = (fwidth // 2) * (fheight // 2) if len(data) != (y_len + 2 * uv_len): raise PiCameraValueError( 'Incorrect buffer length for resolution %dx%d' % (width, height)) # Separate out the Y, U, and V values from the array a = np.frombuffer(data, dtype=np.uint8) Y = a[:y_len] U = a[y_len:-uv_len] V = a[-uv_len:] # Reshape the values into two dimensions, and double the size of the # U and V values (which only have quarter resolution in YUV4:2:0) Y = Y.reshape((fheight, fwidth)) U = U.reshape((fheight // 2, fwidth // 2)).repeat(2, axis=0).repeat(2, axis=1) V = V.reshape((fheight // 2, fwidth // 2)).repeat(2, axis=0).repeat(2, axis=1) # Stack the channels together and crop to the actual resolution return np.dstack((Y, U, V))[:height, :width]
def flush(self): super(PiBayerArray, self).flush() self._demo = None data = self.getvalue()[-6404096:] if data[:4] != b'BRCM': raise PiCameraValueError('Unable to locate Bayer data at end of buffer') # Strip header data = data[32768:] # Reshape into 2D pixel values data = np.frombuffer(data, dtype=np.uint8).\ reshape((1952, 3264))[:1944, :3240] # Unpack 10-bit values; every 5 bytes contains the high 8-bits of 4 # values followed by the low 2-bits of 4 values packed into the fifth # byte data = data.astype(np.uint16) << 2 for byte in range(4): data[:, byte::5] |= ((data[:, 4::5] >> ((4 - byte) * 2)) & 3) data = np.delete(data, np.s_[4::5], 1) # XXX Should test camera's vflip and hflip settings here and adjust self.array = np.zeros(data.shape + (3,), dtype=data.dtype) self.array[1::2, 0::2, 0] = data[1::2, 0::2] # Red self.array[0::2, 0::2, 1] = data[0::2, 0::2] # Green self.array[1::2, 1::2, 1] = data[1::2, 1::2] # Green self.array[0::2, 1::2, 2] = data[0::2, 1::2] # Blue
def run(self): global lastFrame global lockFrame # This method runs in a separate thread while not self.terminated: # Wait for an image to be written to the stream if self.event.wait(1): try: # Read the image and save globally self.stream.seek(0) flippedArray = cv2.flip(self.stream.array, -1) # Flips X and Y retval, thisFrame = cv2.imencode('.jpg', flippedArray) del flippedArray lockFrame.acquire() lastFrame = thisFrame lockFrame.release() finally: # Reset the stream and event self.stream.seek(0) self.stream.truncate() self.event.clear() # Image capture thread
def uploader(file_Queue, db): while True: filename, foreground = file_Queue.get() # make sure filename extension is mp4 filename = filename[:-3] + 'mp4' upload_to_s3(BUCKET_NAME, "videos/"+filename[8:], filename[8:]) #print filename + ", " + filename[8+len(RPiName)+1:-4] # extract the timestamp from the filename timestamp = float(filename[8+len(RPiName)+1:-4]) # convert float array to strings as needed for Dynamo fg_data = {'data': [str(item) for item in foreground]} db.create_item(RPiName, BUCKET_NAME, 'videos/'+filename[8:], timestamp, fg_data) # delete the files once we've uploaded them os.remove(filename[8:]) os.remove(filename[8:-3]+"motion") return
def take_photo_at(self, camera_centre): with picamera.PiCamera() as camera: camera.resolution = config.CAMERA_RESOLUTION camera.framerate = 24 with picamera.array.PiRGBArray(camera) as output: camera.capture(output, 'bgr', use_video_port=True) outputarray = output.array # Rotate image to oriented it with paper. outputarray = np.rot90(outputarray, 3) # Save photo. filename = datetime.datetime.now().strftime("%M%S.%f_") + \ str(camera_centre[0]) \ + '_' \ + str(camera_centre[1]) + '_Photo_' + str(self._photo_index) + '.jpg' cv2.imwrite(os.path.join(config.debug_output_folder, filename), outputarray) self._photo_index += 1 return outputarray
def capture_images(): global count global frame global camera_resolution with picamera.PiCamera() as camera: camera_resolution = camera.resolution camera.shutter_speed = 100 time.sleep(0.5) #Shutter speed is not set instantly. This wait allows time for changes to take effect. log.info("Initialized camera") max_frames = args.max_frames with picamera.array.PiRGBArray(camera) as stream: for index, foo in enumerate(camera.capture_continuous(stream, format="bgr", use_video_port=True)): if stopped is True: return count = index log.info("Captured image. Starting to process...") stream.seek(0) stream.truncate() frame = stream.array log.debug("Converted data to array")
def picamera(): # Picamera will raise an OSError when importing unless the code is run # on an RPI due to firmware dependencies. Make the imports local so we # won't have to deal with this nuance when not calling this method. import picamera import picamera.array with picamera.PiCamera() as camera: camera.resolution = (640, 480) time.sleep(0.1) # allow the camera time to warm up with picamera.array.PiRGBArray(camera, size=(640, 480)) as stream: if camera.closed: raise CameraInitializationError('Camera failed to open.') for frame in camera.capture_continuous(stream, format='bgr', use_video_port=True): image = frame.array yield image # Clear the stream in preparation for the next frame stream.truncate(0) # TODO documentation
def update(self): # keep looping infinitely until the thread is stopped stream = io.BytesIO() for f in self.stream: # grab the frame from the stream and clear the stream in # preparation for the next frame self.frame = f.array # print type(f) , f self.rawCapture.truncate(0) # if the thread indicator variable is set, stop the thread # and resource camera resources if self.stopped: self.stream.close() self.rawCapture.close() self.camera.close() return
def update3(self): # keep looping infinitely until the thread is stopped stream = io.BytesIO() with picamera.PiCamera() as camera: for f in camera.capture_continuous(stream, format='jpeg'): # self.stream: # grab the frame from the stream and clear the stream in # preparation for the next frame self.frame = f.array self.rawCapture.truncate(0) # if the thread indicator variable is set, stop the thread # and resource camera resources if self.stopped: self.stream.close() self.rawCapture.close() self.camera.close() return
def capture_frame(self): self.stream = picamera.array.PiYUVArray(self.camera) self.camera.capture(self.stream, 'yuv') self.camera._set_led(True) self.pixelObjList = [] self.object_id_center = 0 self.pixelObjList.append(PixelObject(self.next_obj_id())) rows = [] for _ in range(self.res_height): rows.append(range(self.res_width)) # flip image horizontally for j, j_ in enumerate(range(self.res_width-1, -1, -1)): # now flip vertically for i, i_ in enumerate(range(self.res_height-1, -1, -1)): rows[j][i] = self.stream.array[j_][i_][0] self.filename = self.save_PNG('raw.png', rows) self.spread_white_pixels( self.make_black_and_white( self.fuse_horizontal_and_vertical( self.get_horizontal_edges(rows), self.get_vertical_edges(rows))) )
def __init__(self): super(StreamProcessor, self).__init__() self.stream = picamera.array.PiRGBArray(camera) self.event = threading.Event() self.terminated = False self.start() self.begin = 0
def ProcessImage(self, image): global autoMode # Get the red section of the image image = cv2.medianBlur(image, 5) image = cv2.cvtColor(image, cv2.COLOR_RGB2HSV) # Swaps the red and blue channels! red = cv2.inRange(image, numpy.array((115, 127, 64)), numpy.array((125, 255, 255))) # Find the contours contours,hierarchy = cv2.findContours(red, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE) # Go through each contour foundArea = -1 foundX = -1 foundY = -1 for contour in contours: x,y,w,h = cv2.boundingRect(contour) cx = x + (w / 2) cy = y + (h / 2) area = w * h if foundArea < area: foundArea = area foundX = cx foundY = cy if foundArea > 0: ball = [foundX, foundY, foundArea] else: ball = None # Set drives or report ball status if autoMode: self.SetSpeedFromBall(ball) else: if ball: print 'Ball at %d,%d (%d)' % (foundX, foundY, foundArea) else: print 'No ball' # Set the motor speed from the ball position
def bytes_to_rgb(data, resolution): """ Converts a bytes objects containing RGB/BGR data to a `numpy`_ array. """ width, height = resolution fwidth, fheight = raw_resolution(resolution) if len(data) != (fwidth * fheight * 3): raise PiCameraValueError( 'Incorrect buffer length for resolution %dx%d' % (width, height)) # Crop to the actual resolution return np.frombuffer(data, dtype=np.uint8).\ reshape((fheight, fwidth, 3))[:height, :width, :]
def __init__(self, camera, size=None): super(PiArrayOutput, self).__init__() self.camera = camera self.size = size self.array = None
def close(self): super(PiArrayOutput, self).close() self.array = None
def flush(self): super(PiRGBArray, self).flush() self.array = bytes_to_rgb(self.getvalue(), self.size or self.camera.resolution)
def rgb_array(self): if self._rgb is None: # Apply the standard biases YUV = self.array.copy() YUV[:, :, 0] = YUV[:, :, 0] - 16 # Offset Y by 16 YUV[:, :, 1:] = YUV[:, :, 1:] - 128 # Offset UV by 128 # YUV conversion matrix from ITU-R BT.601 version (SDTV) # Y U V M = np.array([[1.164, 0.000, 1.596], # R [1.164, -0.392, -0.813], # G [1.164, 2.017, 0.000]]) # B # Calculate the dot product with the matrix to produce RGB output, # clamp the results to byte range and convert to bytes self._rgb = YUV.dot(M.T).clip(0, 255).astype(np.uint8) return self._rgb
def flush(self): super(PiMotionArray, self).flush() width, height = self.size or self.camera.resolution cols = ((width + 15) // 16) + 1 rows = (height + 15) // 16 b = self.getvalue() frames = len(b) // (cols * rows * motion_dtype.itemsize) self.array = np.frombuffer(b, dtype=motion_dtype).reshape((frames, rows, cols))
def analyse(self, array): """ Stub method for users to override. """ raise NotImplementedError
def __init__(self): super(StreamProcessor, self).__init__() self.stream = picamera.array.PiRGBArray(camera) self.event = threading.Event() self.terminated = False self.start()
def trans_per(self, image): image = self.binary_extraction(image) self.binary_image = image ysize = image.shape[0] xsize = image.shape[1] # define region of interest left_bottom = (xsize/10, ysize) apex_l = (xsize/2 - 2600/(self.look_ahead**2), ysize - self.look_ahead*275/30) apex_r = (xsize/2 + 2600/(self.look_ahead**2), ysize - self.look_ahead*275/30) right_bottom = (xsize - xsize/10, ysize) # define vertices for perspective transformation src = np.array([[left_bottom], [apex_l], [apex_r], [right_bottom]], dtype=np.float32) dst = np.float32([[xsize/3,ysize],[xsize/4.5,0],[xsize-xsize/4.5,0],[xsize-xsize/3, ysize]]) self.M = cv2.getPerspectiveTransform(src, dst) self.Minv = cv2.getPerspectiveTransform(dst, src) if len(image.shape) > 2: warped = cv2.warpPerspective(image, self.M, image.shape[-2:None:-1], flags=cv2.INTER_LINEAR) else: warped = cv2.warpPerspective(image, self.M, image.shape[-1:None:-1], flags=cv2.INTER_LINEAR) return warped # creat window mask for lane detecion
def __init__(self): self.camera = picamera.PiCamera() self.camera.vflip = True self.camera.hflip = True #self.camera.resolution = (320, 160) self.camera.start_preview() sleep(5) self.stream = picamera.array.PiYUVArray(self.camera)
def capture(self): self.camera.capture(self.stream, format='yuv') img = self.stream.array[270:,:,0] self.stream.seek(0) self.stream.truncate() return img
def cameraReader(cam_writer_frames_Queue, cam_liveWeb_frame_Queue): camera = picamera.PiCamera() camera.resolution = (320, 240) camera.framerate = FPS stream = picamera.array.PiRGBArray(camera) while True: FRAMES = list() t1 = time.time() startTime = time.time() for c in xrange(FRAMES_PER_CLIP): frameTimestamp = time.asctime() + ' ' + time.tzname[0] camera.capture(stream, format='bgr', use_video_port=True) frame = stream.array if cam_liveWeb_frame_Queue.full() == False: cam_liveWeb_frame_Queue.put(frame, block=False) FRAMES.append((frameTimestamp, frame)) stream.truncate(0) print "Camera Capture", time.time() - t1 # Sending frame to processing process cam_writer_frames_Queue.put((startTime, FRAMES)) del FRAMES return camera.close()
def getMeanCenters(lastMeanCenter, CENTERS): MEANCENTERS = list() MEANCENTERS.append(lastMeanCenter) for i in xrange(FPS): chunk = np.array(CENTERS[i*FPS:(i+1)*FPS]) meanCenter = list(np.mean(chunk, axis=0).astype(int)) MEANCENTERS.append(meanCenter) return MEANCENTERS
def update(self): # keep looping infinitely until the thread is stopped for frameBuf in self.stream: # grab the frame from the stream and clear the stream in # preparation for the next frame self.frame = np.rot90(frameBuf.array) self.rawCapture.truncate(0) # if the thread indicator variable is set, stop the thread # and resource camera resources if self.stopped: self.camera.led = False self.stream.close() self.rawCapture.close() self.camera.close() return
def update(self): self.stream_fps.start() # keep looping infinitely until the thread is stopped for frameBuf in self.stream: self.stream_fps.update() # grab the frame from the stream and clear the stream in preparation for the next frame self.frame = np.rot90(frameBuf.array) self.rawCapture.truncate(0) # if the thread indicator variable is set, stop the thread if self.running == False: self.camera.led = False return
def update(self): # keep looping infinitely until the thread is stopped for frameBuf in self.stream: # grab the frame from the stream and clear the stream in preparation for the next frame self.frame = np.rot90(frameBuf.array) self.rawCapture.truncate(0) # if the thread indicator variable is set, stop the thread if self.running == False: self.camera.led = False return
def update(self): # keep looping infinitely until the thread is stopped for frameBuf in self.stream: # grab the frame from the stream and clear the stream in preparation for the next frame self.frame = frameBuf.array self.rawCapture.truncate(0) # if the thread indicator variable is set, stop the thread if self.running == False: return
def update(self): # keep looping infinitely until the thread is stopped for frameBuf in self.stream: # grab the frame from the stream and clear the stream in preparation for the next frame self.frame = frameBuf.array self.rawCapture.truncate(0) # if the thread indicator variable is set, stop the thread if self.running == False: self.camera.led = False return
def resolution_pixels_xy(self): """The size of a photo in pixels""" return np.array(config.CAMERA_RESOLUTION)
def pixels_to_mm_scale_factors_xy(self): return np.array([config.X_PIXELS_TO_MILLIMETRE_SCALE, config.Y_PIXELS_TO_MILLIMETRE_SCALE])
def capture_images(): """ print "Staring capture thread" global frame while True: print "Attempting capture" (grabbed, f) = stream.read() if grabbed: print "Captured" lock.acquire() frame = imutils.resize(f, width=resolution[0], height=resolution[1]) lock.release() """ print "started capturing thread" global frame with picamera.PiCamera() as camera: camera.resolution = resolution camera.shutter_speed = 250 time.sleep(0.5) # Shutter speed is not set instantly. This wait allows time for changes to take effect. print "Initialized camera..." with picamera.array.PiRGBArray(camera) as stream: for foo in camera.capture_continuous(stream, format="bgr", use_video_port=True): print "Captured an image" stream.seek(0) stream.truncate() lock.acquire() frame = stream.array lock.release() print "Converted image data to array"
def __init__(self, cam, height, width, cropAroundOffset=False, proccessingHeight=0, proccessingWidth=0): self.camera = cam self.stream = picamera.array.PiYUVArray(self.camera) # When this boolean is True, The Frame taken will be cropped down to a 96x96 image for proccessing # the center of the cropped down image will be the expected position of the object self.cropAroundOffset = cropAroundOffset self.height = height self.width = width if self.cropAroundOffset == False: self.proccessingHeight = self.height self.proccessingWidth = self.width else: self.proccessingHeight = proccessingHeight self.proccessingWidth = proccessingWidth self.camera.resolution = (self.height, self.width) self.pixelObjList = [] self.objIDCntr = 0 self.pixelObjList.append(PixelObj.PixelObj(self.getNextObjId(), self.proccessingHeight)) # - - - - - - - - - - - - - - - - # - - - GET NEXT OBJ ID - - - - - # - - - - - - - - - - - - - - - -
def captureFrame(self, offset = None, folderName = 0): self.folderName = folderName self.stream = picamera.array.PiYUVArray(self.camera) self.camera.capture(self.stream, 'yuv') self.camera._set_led(True) self.pixelObjList = [] self.objIDCntr = 0 self.pixelObjList.append(PixelObj.PixelObj(self.getNextObjId(), self.proccessingHeight)) rows = [] for _ in range(self.height): rows.append(range(self.width)) for j, j_ in enumerate(range(self.width - 1,-1,-1)): #flip image horizontally for i, i_ in enumerate(range(self.height - 1, -1, -1)): #flip vertically rows[j][i] = self.stream.array[j_][i_][0] # We need to crop the image to a 96 x 96 image if the cropAroundOffset value it True if self.cropAroundOffset and offset is not None: croppedRows = [] for _ in range(self.proccessingHeight): croppedRows.append(range(self.proccessingWidth)) for j in range(self.proccessingHeight): for i in range(self.proccessingWidth): croppedRows[j][i] = rows[j + offset.y][i + offset.x] rows = croppedRows self.savePNG('raw.png', rows) self.processFrame_5(self.processFrame_4(self.processFrame_3(self.processFrame_1(rows), self.processFrame_2(rows)))) # - - - - - - - - - - - - - - - - # - - - PROCESS FRAME 1 - - - - - # - - - - - - - - - - - - - - - -
def __init__(self, height, width): """Constructor""" self.camera = picamera.PiCamera() self.stream = picamera.array.PiYUVArray(self.camera) self.height = height self.width = width self.camera.resolution = (self.height, self.width) # - - - - - - - - - - - - - - - - # - - - - Take Picture - - - - - # - - - - - - - - - - - - - - - -
def demosaic(self): if self._demo is None: # XXX Again, should take into account camera's vflip and hflip here # Construct representation of the bayer pattern bayer = np.zeros(self.array.shape, dtype=np.uint8) bayer[1::2, 0::2, 0] = 1 # Red bayer[0::2, 0::2, 1] = 1 # Green bayer[1::2, 1::2, 1] = 1 # Green bayer[0::2, 1::2, 2] = 1 # Blue # Allocate output array with same shape as data and set up some # constants to represent the weighted average window window = (3, 3) borders = (window[0] - 1, window[1] - 1) border = (borders[0] // 2, borders[1] // 2) # Pad out the data and the bayer pattern (np.pad is faster but # unavailable on the version of numpy shipped with Raspbian at the # time of writing) rgb = np.zeros(( self.array.shape[0] + borders[0], self.array.shape[1] + borders[1], self.array.shape[2]), dtype=self.array.dtype) rgb[ border[0]:rgb.shape[0] - border[0], border[1]:rgb.shape[1] - border[1], :] = self.array bayer_pad = np.zeros(( self.array.shape[0] + borders[0], self.array.shape[1] + borders[1], self.array.shape[2]), dtype=bayer.dtype) bayer_pad[ border[0]:bayer_pad.shape[0] - border[0], border[1]:bayer_pad.shape[1] - border[1], :] = bayer bayer = bayer_pad # For each plane in the RGB data, construct a view over the plane # of 3x3 matrices. Then do the same for the bayer array and use # Einstein summation to get the weighted average self._demo = np.empty(self.array.shape, dtype=self.array.dtype) for plane in range(3): p = rgb[..., plane] b = bayer[..., plane] pview = as_strided(p, shape=( p.shape[0] - borders[0], p.shape[1] - borders[1]) + window, strides=p.strides * 2) bview = as_strided(b, shape=( b.shape[0] - borders[0], b.shape[1] - borders[1]) + window, strides=b.strides * 2) psum = np.einsum('ijkl->ij', pview) bsum = np.einsum('ijkl->ij', bview) self._demo[..., plane] = psum // bsum return self._demo
def project_on_road(self, image_input): image = image_input[self.remove_pixels:, :] image = self.trans_per(image) self.im_shape = image.shape self.get_fit(image) if self.detected_first & self.detected: # create fill image temp_filler = np.zeros((self.remove_pixels,self.im_shape[1])).astype(np.uint8) filler = np.dstack((temp_filler,temp_filler,temp_filler)) # create an image to draw the lines on warp_zero = np.zeros_like(image).astype(np.uint8) color_warp = np.dstack((warp_zero, warp_zero, warp_zero)) ploty = np.linspace(0, image_input.shape[0]-1, image_input.shape[0] ) left_fitx = self.best_fit_l[0]*ploty**2 + self.best_fit_l[1]*ploty + self.best_fit_l[2] right_fitx = self.best_fit_r[0]*ploty**2 + self.best_fit_r[1]*ploty + self.best_fit_r[2] # recast the x and y points into usable format for cv2.fillPoly() pts_left = np.array([np.transpose(np.vstack([left_fitx, ploty]))]) pts_right = np.array([np.flipud(np.transpose(np.vstack([right_fitx, ploty])))]) pts = np.hstack((pts_left, pts_right)) # draw the lane onto the warped blank image cv2.fillPoly(color_warp, np.int_([pts]), (0,255, 0)) # warp the blank back to original image space using inverse perspective matrix (Minv) newwarp = cv2.warpPerspective(color_warp, self.Minv, color_warp.shape[-2:None:-1]) left_right = cv2.warpPerspective(self.left_right, self.Minv, color_warp.shape[-2:None:-1]) # combine the result with the original image left_right_fill = np.vstack((filler,left_right)) result = cv2.addWeighted(left_right_fill,1, image_input, 1, 0) result = cv2.addWeighted(result, 1, np.vstack((filler,newwarp)), 0.3, 0) # get curvature and offset self.calculate_curvature_offset() # plot text on resulting image img_text = "radius of curvature: " + str(round((self.left_curverad + self.right_curverad)/2,2)) + ' (m)' if self.offset< 0: img_text2 = "vehicle is: " + str(round(np.abs(self.offset),2)) + ' (m) left of center' else: img_text2 = "vehicle is: " + str(round(np.abs(self.offset),2)) + ' (m) right of center' result2 = cv2.resize(result, (0,0), fx=self.enlarge, fy=self.enlarge) cv2.putText(result2,img_text, (15,15), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255),1) cv2.putText(result2,img_text2,(15,40), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255),1) return result2 # if lanes were not detected output source image else: return cv2.resize(image_input,(0,0), fx=self.enlarge, fy=self.enlarge)