/** * Get the next DomainPeer-- either from the cache or by creating it. * * @return the next DomainPeer, or null if we could not construct one. */ private BlockReaderPeer nextDomainPeer() { if (remainingCacheTries > 0) { Peer peer = clientContext.getPeerCache().get(datanode, true); if (peer != null) { if (LOG.isTraceEnabled()) { LOG.trace("nextDomainPeer: reusing existing peer " + peer); } return new BlockReaderPeer(peer, true); } } DomainSocket sock = clientContext.getDomainSocketFactory(). createSocket(pathInfo, conf.socketTimeout); if (sock == null) return null; return new BlockReaderPeer(new DomainPeer(sock), false); }
public Slot allocSlot(DatanodeInfo datanode, DomainPeer peer, MutableBoolean usedPeer, ExtendedBlockId blockId, String clientName) throws IOException { lock.lock(); try { if (closed) { LOG.trace(this + ": the DfsClientShmManager isclosed."); return null; } EndpointShmManager shmManager = datanodes.get(datanode); if (shmManager == null) { shmManager = new EndpointShmManager(datanode); datanodes.put(datanode, shmManager); } return shmManager.allocSlot(peer, usedPeer, clientName, blockId); } finally { lock.unlock(); } }
/** * Get the next DomainPeer-- either from the cache or by creating it. * * @return the next DomainPeer, or null if we could not construct one. */ private BlockReaderPeer nextDomainPeer() { if (remainingCacheTries > 0) { Peer peer = clientContext.getPeerCache().get(datanode, true); if (peer != null) { LOG.trace("nextDomainPeer: reusing existing peer {}", peer); return new BlockReaderPeer(peer, true); } } DomainSocket sock = clientContext.getDomainSocketFactory(). createSocket(pathInfo, conf.getSocketTimeout()); if (sock == null) return null; return new BlockReaderPeer(new DomainPeer(sock), false); }
/** * Get a RemoteBlockReader that communicates over a UNIX domain socket. * * @return The new BlockReader, or null if we failed to create the block * reader. * * @throws InvalidToken If the block token was invalid. * Potentially other security-related execptions. */ private BlockReader getRemoteBlockReaderFromDomain() throws IOException { if (pathInfo == null) { pathInfo = clientContext.getDomainSocketFactory(). getPathInfo(inetSocketAddress, conf); } if (!pathInfo.getPathState().getUsableForDataTransfer()) { PerformanceAdvisory.LOG.debug(this + ": not trying to create a " + "remote block reader because the UNIX domain socket at " + pathInfo + " is not usable."); return null; } if (LOG.isTraceEnabled()) { LOG.trace(this + ": trying to create a remote block reader from the " + "UNIX domain socket at " + pathInfo.getPath()); } while (true) { BlockReaderPeer curPeer = nextDomainPeer(); if (curPeer == null) break; if (curPeer.fromCache) remainingCacheTries--; DomainPeer peer = (DomainPeer)curPeer.peer; BlockReader blockReader = null; try { blockReader = getRemoteBlockReader(peer); return blockReader; } catch (IOException ioe) { IOUtils.cleanup(LOG, peer); if (isSecurityException(ioe)) { if (LOG.isTraceEnabled()) { LOG.trace(this + ": got security exception while constructing " + "a remote block reader from the unix domain socket at " + pathInfo.getPath(), ioe); } throw ioe; } if (curPeer.fromCache) { // Handle an I/O error we got when using a cached peer. These are // considered less serious, because the underlying socket may be stale. if (LOG.isDebugEnabled()) { LOG.debug("Closed potentially stale domain peer " + peer, ioe); } } else { // Handle an I/O error we got when using a newly created domain peer. // We temporarily disable the domain socket path for a few minutes in // this case, to prevent wasting more time on it. LOG.warn("I/O error constructing remote block reader. Disabling " + "domain socket " + peer.getDomainSocket(), ioe); clientContext.getDomainSocketFactory() .disableDomainSocketPath(pathInfo.getPath()); return null; } } finally { if (blockReader == null) { IOUtils.cleanup(LOG, peer); } } } return null; }
/** * Ask the DataNode for a new shared memory segment. This function must be * called with the manager lock held. We will release the lock while * communicating with the DataNode. * * @param clientName The current client name. * @param peer The peer to use to talk to the DataNode. * * @return Null if the DataNode does not support shared memory * segments, or experienced an error creating the * shm. The shared memory segment itself on success. * @throws IOException If there was an error communicating over the socket. * We will not throw an IOException unless the socket * itself (or the network) is the problem. */ private DfsClientShm requestNewShm(String clientName, DomainPeer peer) throws IOException { final DataOutputStream out = new DataOutputStream( new BufferedOutputStream(peer.getOutputStream())); new Sender(out).requestShortCircuitShm(clientName); ShortCircuitShmResponseProto resp = ShortCircuitShmResponseProto.parseFrom( PBHelper.vintPrefixed(peer.getInputStream())); String error = resp.hasError() ? resp.getError() : "(unknown)"; switch (resp.getStatus()) { case SUCCESS: DomainSocket sock = peer.getDomainSocket(); byte buf[] = new byte[1]; FileInputStream fis[] = new FileInputStream[1]; if (sock.recvFileInputStreams(fis, buf, 0, buf.length) < 0) { throw new EOFException("got EOF while trying to transfer the " + "file descriptor for the shared memory segment."); } if (fis[0] == null) { throw new IOException("the datanode " + datanode + " failed to " + "pass a file descriptor for the shared memory segment."); } try { DfsClientShm shm = new DfsClientShm(PBHelper.convert(resp.getId()), fis[0], this, peer); if (LOG.isTraceEnabled()) { LOG.trace(this + ": createNewShm: created " + shm); } return shm; } finally { IOUtils.cleanup(LOG, fis[0]); } case ERROR_UNSUPPORTED: // The DataNode just does not support short-circuit shared memory // access, and we should stop asking. LOG.info(this + ": datanode does not support short-circuit " + "shared memory access: " + error); disabled = true; return null; default: // The datanode experienced some kind of unexpected error when trying to // create the short-circuit shared memory segment. LOG.warn(this + ": error requesting short-circuit shared memory " + "access: " + error); return null; } }
/** * Allocate a new shared memory slot connected to this datanode. * * Must be called with the EndpointShmManager lock held. * * @param peer The peer to use to talk to the DataNode. * @param usedPeer (out param) Will be set to true if we used the peer. * When a peer is used * * @param clientName The client name. * @param blockId The block ID to use. * @return null if the DataNode does not support shared memory * segments, or experienced an error creating the * shm. The shared memory segment itself on success. * @throws IOException If there was an error communicating over the socket. */ Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer, String clientName, ExtendedBlockId blockId) throws IOException { while (true) { if (closed) { if (LOG.isTraceEnabled()) { LOG.trace(this + ": the DfsClientShmManager has been closed."); } return null; } if (disabled) { if (LOG.isTraceEnabled()) { LOG.trace(this + ": shared memory segment access is disabled."); } return null; } // Try to use an existing slot. Slot slot = allocSlotFromExistingShm(blockId); if (slot != null) { return slot; } // There are no free slots. If someone is loading more slots, wait // for that to finish. if (loading) { if (LOG.isTraceEnabled()) { LOG.trace(this + ": waiting for loading to finish..."); } finishedLoading.awaitUninterruptibly(); } else { // Otherwise, load the slot ourselves. loading = true; lock.unlock(); DfsClientShm shm; try { shm = requestNewShm(clientName, peer); if (shm == null) continue; // See #{DfsClientShmManager#domainSocketWatcher} for details // about why we do this before retaking the manager lock. domainSocketWatcher.add(peer.getDomainSocket(), shm); // The DomainPeer is now our responsibility, and should not be // closed by the caller. usedPeer.setValue(true); } finally { lock.lock(); loading = false; finishedLoading.signalAll(); } if (shm.isDisconnected()) { // If the peer closed immediately after the shared memory segment // was created, the DomainSocketWatcher callback might already have // fired and marked the shm as disconnected. In this case, we // obviously don't want to add the SharedMemorySegment to our list // of valid not-full segments. if (LOG.isDebugEnabled()) { LOG.debug(this + ": the UNIX domain socket associated with " + "this short-circuit memory closed before we could make " + "use of the shm."); } } else { notFull.put(shm.getShmId(), shm); } } } }
DfsClientShm(ShmId shmId, FileInputStream stream, EndpointShmManager manager, DomainPeer peer) throws IOException { super(shmId, stream); this.manager = manager; this.peer = peer; }
public DomainPeer getPeer() { return peer; }
private static DomainPeer getDomainPeerToDn(Configuration conf) throws IOException { DomainSocket sock = DomainSocket.connect(conf.get(DFS_DOMAIN_SOCKET_PATH_KEY)); return new DomainPeer(sock); }
/** * Get a RemoteBlockReader that communicates over a UNIX domain socket. * * @return The new BlockReader, or null if we failed to create the block * reader. * * @throws InvalidToken If the block token was invalid. * Potentially other security-related execptions. */ private BlockReader getRemoteBlockReaderFromDomain() throws IOException { if (pathInfo == null) { pathInfo = clientContext.getDomainSocketFactory() .getPathInfo(inetSocketAddress, conf.getShortCircuitConf()); } if (!pathInfo.getPathState().getUsableForDataTransfer()) { PerformanceAdvisory.LOG.debug("{}: not trying to create a " + "remote block reader because the UNIX domain socket at {}" + " is not usable.", this, pathInfo); return null; } LOG.trace("{}: trying to create a remote block reader from the UNIX domain " + "socket at {}", this, pathInfo.getPath()); while (true) { BlockReaderPeer curPeer = nextDomainPeer(); if (curPeer == null) break; if (curPeer.fromCache) remainingCacheTries--; DomainPeer peer = (DomainPeer)curPeer.peer; BlockReader blockReader = null; try { blockReader = getRemoteBlockReader(peer); return blockReader; } catch (IOException ioe) { IOUtilsClient.cleanup(LOG, peer); if (isSecurityException(ioe)) { LOG.trace("{}: got security exception while constructing a remote " + " block reader from the unix domain socket at {}", this, pathInfo.getPath(), ioe); throw ioe; } if (curPeer.fromCache) { // Handle an I/O error we got when using a cached peer. These are // considered less serious because the underlying socket may be stale. LOG.debug("Closed potentially stale domain peer {}", peer, ioe); } else { // Handle an I/O error we got when using a newly created domain peer. // We temporarily disable the domain socket path for a few minutes in // this case, to prevent wasting more time on it. LOG.warn("I/O error constructing remote block reader. Disabling " + "domain socket " + peer.getDomainSocket(), ioe); clientContext.getDomainSocketFactory() .disableDomainSocketPath(pathInfo.getPath()); return null; } } finally { if (blockReader == null) { IOUtilsClient.cleanup(LOG, peer); } } } return null; }
/** * Ask the DataNode for a new shared memory segment. This function must be * called with the manager lock held. We will release the lock while * communicating with the DataNode. * * @param clientName The current client name. * @param peer The peer to use to talk to the DataNode. * * @return Null if the DataNode does not support shared memory * segments, or experienced an error creating the * shm. The shared memory segment itself on success. * @throws IOException If there was an error communicating over the socket. * We will not throw an IOException unless the socket * itself (or the network) is the problem. */ private DfsClientShm requestNewShm(String clientName, DomainPeer peer) throws IOException { final DataOutputStream out = new DataOutputStream( new BufferedOutputStream(peer.getOutputStream())); new Sender(out).requestShortCircuitShm(clientName); ShortCircuitShmResponseProto resp = ShortCircuitShmResponseProto.parseFrom( PBHelperClient.vintPrefixed(peer.getInputStream())); String error = resp.hasError() ? resp.getError() : "(unknown)"; switch (resp.getStatus()) { case SUCCESS: DomainSocket sock = peer.getDomainSocket(); byte buf[] = new byte[1]; FileInputStream fis[] = new FileInputStream[1]; if (sock.recvFileInputStreams(fis, buf, 0, buf.length) < 0) { throw new EOFException("got EOF while trying to transfer the " + "file descriptor for the shared memory segment."); } if (fis[0] == null) { throw new IOException("the datanode " + datanode + " failed to " + "pass a file descriptor for the shared memory segment."); } try { DfsClientShm shm = new DfsClientShm(PBHelperClient.convert(resp.getId()), fis[0], this, peer); LOG.trace("{}: createNewShm: created {}", this, shm); return shm; } finally { try { fis[0].close(); } catch (Throwable e) { LOG.debug("Exception in closing " + fis[0], e); } } case ERROR_UNSUPPORTED: // The DataNode just does not support short-circuit shared memory // access, and we should stop asking. LOG.info(this + ": datanode does not support short-circuit " + "shared memory access: " + error); disabled = true; return null; default: // The datanode experienced some kind of unexpected error when trying to // create the short-circuit shared memory segment. LOG.warn(this + ": error requesting short-circuit shared memory " + "access: " + error); return null; } }
/** * Allocate a new shared memory slot connected to this datanode. * * Must be called with the EndpointShmManager lock held. * * @param peer The peer to use to talk to the DataNode. * @param usedPeer (out param) Will be set to true if we used the peer. * When a peer is used * * @param clientName The client name. * @param blockId The block ID to use. * @return null if the DataNode does not support shared memory * segments, or experienced an error creating the * shm. The shared memory segment itself on success. * @throws IOException If there was an error communicating over the socket. */ Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer, String clientName, ExtendedBlockId blockId) throws IOException { while (true) { if (closed) { LOG.trace("{}: the DfsClientShmManager has been closed.", this); return null; } if (disabled) { LOG.trace("{}: shared memory segment access is disabled.", this); return null; } // Try to use an existing slot. Slot slot = allocSlotFromExistingShm(blockId); if (slot != null) { return slot; } // There are no free slots. If someone is loading more slots, wait // for that to finish. if (loading) { LOG.trace("{}: waiting for loading to finish...", this); finishedLoading.awaitUninterruptibly(); } else { // Otherwise, load the slot ourselves. loading = true; lock.unlock(); DfsClientShm shm; try { shm = requestNewShm(clientName, peer); if (shm == null) continue; // See #{DfsClientShmManager#domainSocketWatcher} for details // about why we do this before retaking the manager lock. domainSocketWatcher.add(peer.getDomainSocket(), shm); // The DomainPeer is now our responsibility, and should not be // closed by the caller. usedPeer.setValue(true); } finally { lock.lock(); loading = false; finishedLoading.signalAll(); } if (shm.isDisconnected()) { // If the peer closed immediately after the shared memory segment // was created, the DomainSocketWatcher callback might already have // fired and marked the shm as disconnected. In this case, we // obviously don't want to add the SharedMemorySegment to our list // of valid not-full segments. LOG.debug("{}: the UNIX domain socket associated with this " + "short-circuit memory closed before we could make use of " + "the shm.", this); } else { notFull.put(shm.getShmId(), shm); } } } }
/** * Allocate a new shared memory slot connected to this datanode. * * Must be called with the EndpointShmManager lock held. * * @param peer The peer to use to talk to the DataNode. * @param clientName The client name. * @param usedPeer (out param) Will be set to true if we used the peer. * When a peer is used * * @return null if the DataNode does not support shared memory * segments, or experienced an error creating the * shm. The shared memory segment itself on success. * @throws IOException If there was an error communicating over the socket. */ Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer, String clientName, ExtendedBlockId blockId) throws IOException { while (true) { if (closed) { if (LOG.isTraceEnabled()) { LOG.trace(this + ": the DfsClientShmManager has been closed."); } return null; } if (disabled) { if (LOG.isTraceEnabled()) { LOG.trace(this + ": shared memory segment access is disabled."); } return null; } // Try to use an existing slot. Slot slot = allocSlotFromExistingShm(blockId); if (slot != null) { return slot; } // There are no free slots. If someone is loading more slots, wait // for that to finish. if (loading) { if (LOG.isTraceEnabled()) { LOG.trace(this + ": waiting for loading to finish..."); } finishedLoading.awaitUninterruptibly(); } else { // Otherwise, load the slot ourselves. loading = true; lock.unlock(); DfsClientShm shm; try { shm = requestNewShm(clientName, peer); if (shm == null) continue; // See #{DfsClientShmManager#domainSocketWatcher} for details // about why we do this before retaking the manager lock. domainSocketWatcher.add(peer.getDomainSocket(), shm); // The DomainPeer is now our responsibility, and should not be // closed by the caller. usedPeer.setValue(true); } finally { lock.lock(); loading = false; finishedLoading.signalAll(); } if (shm.isDisconnected()) { // If the peer closed immediately after the shared memory segment // was created, the DomainSocketWatcher callback might already have // fired and marked the shm as disconnected. In this case, we // obviously don't want to add the SharedMemorySegment to our list // of valid not-full segments. if (LOG.isDebugEnabled()) { LOG.debug(this + ": the UNIX domain socket associated with " + "this short-circuit memory closed before we could make " + "use of the shm."); } } else { notFull.put(shm.getShmId(), shm); } } } }