Java 类org.apache.hadoop.hdfs.net.DomainPeer 实例源码

项目:hadoop    文件:BlockReaderFactory.java   
/**
 * Get the next DomainPeer-- either from the cache or by creating it.
 *
 * @return the next DomainPeer, or null if we could not construct one.
 */
private BlockReaderPeer nextDomainPeer() {
  if (remainingCacheTries > 0) {
    Peer peer = clientContext.getPeerCache().get(datanode, true);
    if (peer != null) {
      if (LOG.isTraceEnabled()) {
        LOG.trace("nextDomainPeer: reusing existing peer " + peer);
      }
      return new BlockReaderPeer(peer, true);
    }
  }
  DomainSocket sock = clientContext.getDomainSocketFactory().
      createSocket(pathInfo, conf.socketTimeout);
  if (sock == null) return null;
  return new BlockReaderPeer(new DomainPeer(sock), false);
}
项目:hadoop    文件:DfsClientShmManager.java   
public Slot allocSlot(DatanodeInfo datanode, DomainPeer peer,
    MutableBoolean usedPeer, ExtendedBlockId blockId,
    String clientName) throws IOException {
  lock.lock();
  try {
    if (closed) {
      LOG.trace(this + ": the DfsClientShmManager isclosed.");
      return null;
    }
    EndpointShmManager shmManager = datanodes.get(datanode);
    if (shmManager == null) {
      shmManager = new EndpointShmManager(datanode);
      datanodes.put(datanode, shmManager);
    }
    return shmManager.allocSlot(peer, usedPeer, clientName, blockId);
  } finally {
    lock.unlock();
  }
}
项目:aliyun-oss-hadoop-fs    文件:DfsClientShmManager.java   
public Slot allocSlot(DatanodeInfo datanode, DomainPeer peer,
    MutableBoolean usedPeer, ExtendedBlockId blockId,
    String clientName) throws IOException {
  lock.lock();
  try {
    if (closed) {
      LOG.trace(this + ": the DfsClientShmManager isclosed.");
      return null;
    }
    EndpointShmManager shmManager = datanodes.get(datanode);
    if (shmManager == null) {
      shmManager = new EndpointShmManager(datanode);
      datanodes.put(datanode, shmManager);
    }
    return shmManager.allocSlot(peer, usedPeer, clientName, blockId);
  } finally {
    lock.unlock();
  }
}
项目:big-c    文件:BlockReaderFactory.java   
/**
 * Get the next DomainPeer-- either from the cache or by creating it.
 *
 * @return the next DomainPeer, or null if we could not construct one.
 */
private BlockReaderPeer nextDomainPeer() {
  if (remainingCacheTries > 0) {
    Peer peer = clientContext.getPeerCache().get(datanode, true);
    if (peer != null) {
      if (LOG.isTraceEnabled()) {
        LOG.trace("nextDomainPeer: reusing existing peer " + peer);
      }
      return new BlockReaderPeer(peer, true);
    }
  }
  DomainSocket sock = clientContext.getDomainSocketFactory().
      createSocket(pathInfo, conf.socketTimeout);
  if (sock == null) return null;
  return new BlockReaderPeer(new DomainPeer(sock), false);
}
项目:big-c    文件:DfsClientShmManager.java   
public Slot allocSlot(DatanodeInfo datanode, DomainPeer peer,
    MutableBoolean usedPeer, ExtendedBlockId blockId,
    String clientName) throws IOException {
  lock.lock();
  try {
    if (closed) {
      LOG.trace(this + ": the DfsClientShmManager isclosed.");
      return null;
    }
    EndpointShmManager shmManager = datanodes.get(datanode);
    if (shmManager == null) {
      shmManager = new EndpointShmManager(datanode);
      datanodes.put(datanode, shmManager);
    }
    return shmManager.allocSlot(peer, usedPeer, clientName, blockId);
  } finally {
    lock.unlock();
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:BlockReaderFactory.java   
/**
 * Get the next DomainPeer-- either from the cache or by creating it.
 *
 * @return the next DomainPeer, or null if we could not construct one.
 */
private BlockReaderPeer nextDomainPeer() {
  if (remainingCacheTries > 0) {
    Peer peer = clientContext.getPeerCache().get(datanode, true);
    if (peer != null) {
      if (LOG.isTraceEnabled()) {
        LOG.trace("nextDomainPeer: reusing existing peer " + peer);
      }
      return new BlockReaderPeer(peer, true);
    }
  }
  DomainSocket sock = clientContext.getDomainSocketFactory().
      createSocket(pathInfo, conf.socketTimeout);
  if (sock == null) return null;
  return new BlockReaderPeer(new DomainPeer(sock), false);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:DfsClientShmManager.java   
public Slot allocSlot(DatanodeInfo datanode, DomainPeer peer,
    MutableBoolean usedPeer, ExtendedBlockId blockId,
    String clientName) throws IOException {
  lock.lock();
  try {
    if (closed) {
      LOG.trace(this + ": the DfsClientShmManager isclosed.");
      return null;
    }
    EndpointShmManager shmManager = datanodes.get(datanode);
    if (shmManager == null) {
      shmManager = new EndpointShmManager(datanode);
      datanodes.put(datanode, shmManager);
    }
    return shmManager.allocSlot(peer, usedPeer, clientName, blockId);
  } finally {
    lock.unlock();
  }
}
项目:FlexMap    文件:BlockReaderFactory.java   
/**
 * Get the next DomainPeer-- either from the cache or by creating it.
 *
 * @return the next DomainPeer, or null if we could not construct one.
 */
private BlockReaderPeer nextDomainPeer() {
  if (remainingCacheTries > 0) {
    Peer peer = clientContext.getPeerCache().get(datanode, true);
    if (peer != null) {
      if (LOG.isTraceEnabled()) {
        LOG.trace("nextDomainPeer: reusing existing peer " + peer);
      }
      return new BlockReaderPeer(peer, true);
    }
  }
  DomainSocket sock = clientContext.getDomainSocketFactory().
      createSocket(pathInfo, conf.socketTimeout);
  if (sock == null) return null;
  return new BlockReaderPeer(new DomainPeer(sock), false);
}
项目:FlexMap    文件:DfsClientShmManager.java   
public Slot allocSlot(DatanodeInfo datanode, DomainPeer peer,
    MutableBoolean usedPeer, ExtendedBlockId blockId,
    String clientName) throws IOException {
  lock.lock();
  try {
    if (closed) {
      LOG.trace(this + ": the DfsClientShmManager isclosed.");
      return null;
    }
    EndpointShmManager shmManager = datanodes.get(datanode);
    if (shmManager == null) {
      shmManager = new EndpointShmManager(datanode);
      datanodes.put(datanode, shmManager);
    }
    return shmManager.allocSlot(peer, usedPeer, clientName, blockId);
  } finally {
    lock.unlock();
  }
}
项目:hadoop-on-lustre2    文件:BlockReaderFactory.java   
/**
 * Get the next DomainPeer-- either from the cache or by creating it.
 *
 * @return the next DomainPeer, or null if we could not construct one.
 */
private BlockReaderPeer nextDomainPeer() {
  if (remainingCacheTries > 0) {
    Peer peer = clientContext.getPeerCache().get(datanode, true);
    if (peer != null) {
      if (LOG.isTraceEnabled()) {
        LOG.trace("nextDomainPeer: reusing existing peer " + peer);
      }
      return new BlockReaderPeer(peer, true);
    }
  }
  DomainSocket sock = clientContext.getDomainSocketFactory().
      createSocket(pathInfo, conf.socketTimeout);
  if (sock == null) return null;
  return new BlockReaderPeer(new DomainPeer(sock), false);
}
项目:hadoop-on-lustre2    文件:DfsClientShmManager.java   
public Slot allocSlot(DatanodeInfo datanode, DomainPeer peer,
    MutableBoolean usedPeer, ExtendedBlockId blockId,
    String clientName) throws IOException {
  lock.lock();
  try {
    if (closed) {
      LOG.trace(this + ": the DfsClientShmManager isclosed.");
      return null;
    }
    EndpointShmManager shmManager = datanodes.get(datanode);
    if (shmManager == null) {
      shmManager = new EndpointShmManager(datanode);
      datanodes.put(datanode, shmManager);
    }
    return shmManager.allocSlot(peer, usedPeer, clientName, blockId);
  } finally {
    lock.unlock();
  }
}
项目:aliyun-oss-hadoop-fs    文件:BlockReaderFactory.java   
/**
 * Get the next DomainPeer-- either from the cache or by creating it.
 *
 * @return the next DomainPeer, or null if we could not construct one.
 */
private BlockReaderPeer nextDomainPeer() {
  if (remainingCacheTries > 0) {
    Peer peer = clientContext.getPeerCache().get(datanode, true);
    if (peer != null) {
      LOG.trace("nextDomainPeer: reusing existing peer {}", peer);
      return new BlockReaderPeer(peer, true);
    }
  }
  DomainSocket sock = clientContext.getDomainSocketFactory().
      createSocket(pathInfo, conf.getSocketTimeout());
  if (sock == null) return null;
  return new BlockReaderPeer(new DomainPeer(sock), false);
}
项目:hadoop    文件:BlockReaderFactory.java   
/**
 * Get a RemoteBlockReader that communicates over a UNIX domain socket.
 *
 * @return The new BlockReader, or null if we failed to create the block
 * reader.
 *
 * @throws InvalidToken    If the block token was invalid.
 * Potentially other security-related execptions.
 */
private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
  if (pathInfo == null) {
    pathInfo = clientContext.getDomainSocketFactory().
                    getPathInfo(inetSocketAddress, conf);
  }
  if (!pathInfo.getPathState().getUsableForDataTransfer()) {
    PerformanceAdvisory.LOG.debug(this + ": not trying to create a " +
        "remote block reader because the UNIX domain socket at " +
        pathInfo + " is not usable.");
    return null;
  }
  if (LOG.isTraceEnabled()) {
    LOG.trace(this + ": trying to create a remote block reader from the " +
        "UNIX domain socket at " + pathInfo.getPath());
  }

  while (true) {
    BlockReaderPeer curPeer = nextDomainPeer();
    if (curPeer == null) break;
    if (curPeer.fromCache) remainingCacheTries--;
    DomainPeer peer = (DomainPeer)curPeer.peer;
    BlockReader blockReader = null;
    try {
      blockReader = getRemoteBlockReader(peer);
      return blockReader;
    } catch (IOException ioe) {
      IOUtils.cleanup(LOG, peer);
      if (isSecurityException(ioe)) {
        if (LOG.isTraceEnabled()) {
          LOG.trace(this + ": got security exception while constructing " +
              "a remote block reader from the unix domain socket at " +
              pathInfo.getPath(), ioe);
        }
        throw ioe;
      }
      if (curPeer.fromCache) {
        // Handle an I/O error we got when using a cached peer.  These are
        // considered less serious, because the underlying socket may be stale.
        if (LOG.isDebugEnabled()) {
          LOG.debug("Closed potentially stale domain peer " + peer, ioe);
        }
      } else {
        // Handle an I/O error we got when using a newly created domain peer.
        // We temporarily disable the domain socket path for a few minutes in
        // this case, to prevent wasting more time on it.
        LOG.warn("I/O error constructing remote block reader.  Disabling " +
            "domain socket " + peer.getDomainSocket(), ioe);
        clientContext.getDomainSocketFactory()
            .disableDomainSocketPath(pathInfo.getPath());
        return null;
      }
    } finally {
      if (blockReader == null) {
        IOUtils.cleanup(LOG, peer);
      }
    }
  }
  return null;
}
项目:hadoop    文件:DfsClientShmManager.java   
/**
 * Ask the DataNode for a new shared memory segment.  This function must be
 * called with the manager lock held.  We will release the lock while
 * communicating with the DataNode.
 *
 * @param clientName    The current client name.
 * @param peer          The peer to use to talk to the DataNode.
 *
 * @return              Null if the DataNode does not support shared memory
 *                        segments, or experienced an error creating the
 *                        shm.  The shared memory segment itself on success.
 * @throws IOException  If there was an error communicating over the socket.
 *                        We will not throw an IOException unless the socket
 *                        itself (or the network) is the problem.
 */
private DfsClientShm requestNewShm(String clientName, DomainPeer peer)
    throws IOException {
  final DataOutputStream out = 
      new DataOutputStream(
          new BufferedOutputStream(peer.getOutputStream()));
  new Sender(out).requestShortCircuitShm(clientName);
  ShortCircuitShmResponseProto resp = 
      ShortCircuitShmResponseProto.parseFrom(
          PBHelper.vintPrefixed(peer.getInputStream()));
  String error = resp.hasError() ? resp.getError() : "(unknown)";
  switch (resp.getStatus()) {
  case SUCCESS:
    DomainSocket sock = peer.getDomainSocket();
    byte buf[] = new byte[1];
    FileInputStream fis[] = new FileInputStream[1];
    if (sock.recvFileInputStreams(fis, buf, 0, buf.length) < 0) {
      throw new EOFException("got EOF while trying to transfer the " +
          "file descriptor for the shared memory segment.");
    }
    if (fis[0] == null) {
      throw new IOException("the datanode " + datanode + " failed to " +
          "pass a file descriptor for the shared memory segment.");
    }
    try {
      DfsClientShm shm = 
          new DfsClientShm(PBHelper.convert(resp.getId()),
              fis[0], this, peer);
      if (LOG.isTraceEnabled()) {
        LOG.trace(this + ": createNewShm: created " + shm);
      }
      return shm;
    } finally {
      IOUtils.cleanup(LOG,  fis[0]);
    }
  case ERROR_UNSUPPORTED:
    // The DataNode just does not support short-circuit shared memory
    // access, and we should stop asking.
    LOG.info(this + ": datanode does not support short-circuit " +
        "shared memory access: " + error);
    disabled = true;
    return null;
  default:
    // The datanode experienced some kind of unexpected error when trying to
    // create the short-circuit shared memory segment.
    LOG.warn(this + ": error requesting short-circuit shared memory " +
        "access: " + error);
    return null;
  }
}
项目:hadoop    文件:DfsClientShmManager.java   
/**
 * Allocate a new shared memory slot connected to this datanode.
 *
 * Must be called with the EndpointShmManager lock held.
 *
 * @param peer          The peer to use to talk to the DataNode.
 * @param usedPeer      (out param) Will be set to true if we used the peer.
 *                        When a peer is used
 *
 * @param clientName    The client name.
 * @param blockId       The block ID to use.
 * @return              null if the DataNode does not support shared memory
 *                        segments, or experienced an error creating the
 *                        shm.  The shared memory segment itself on success.
 * @throws IOException  If there was an error communicating over the socket.
 */
Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer,
    String clientName, ExtendedBlockId blockId) throws IOException {
  while (true) {
    if (closed) {
      if (LOG.isTraceEnabled()) {
        LOG.trace(this + ": the DfsClientShmManager has been closed.");
      }
      return null;
    }
    if (disabled) {
      if (LOG.isTraceEnabled()) {
        LOG.trace(this + ": shared memory segment access is disabled.");
      }
      return null;
    }
    // Try to use an existing slot.
    Slot slot = allocSlotFromExistingShm(blockId);
    if (slot != null) {
      return slot;
    }
    // There are no free slots.  If someone is loading more slots, wait
    // for that to finish.
    if (loading) {
      if (LOG.isTraceEnabled()) {
        LOG.trace(this + ": waiting for loading to finish...");
      }
      finishedLoading.awaitUninterruptibly();
    } else {
      // Otherwise, load the slot ourselves.
      loading = true;
      lock.unlock();
      DfsClientShm shm;
      try {
        shm = requestNewShm(clientName, peer);
        if (shm == null) continue;
        // See #{DfsClientShmManager#domainSocketWatcher} for details
        // about why we do this before retaking the manager lock.
        domainSocketWatcher.add(peer.getDomainSocket(), shm);
        // The DomainPeer is now our responsibility, and should not be
        // closed by the caller.
        usedPeer.setValue(true);
      } finally {
        lock.lock();
        loading = false;
        finishedLoading.signalAll();
      }
      if (shm.isDisconnected()) {
        // If the peer closed immediately after the shared memory segment
        // was created, the DomainSocketWatcher callback might already have
        // fired and marked the shm as disconnected.  In this case, we
        // obviously don't want to add the SharedMemorySegment to our list
        // of valid not-full segments.
        if (LOG.isDebugEnabled()) {
          LOG.debug(this + ": the UNIX domain socket associated with " +
              "this short-circuit memory closed before we could make " +
              "use of the shm.");
        }
      } else {
        notFull.put(shm.getShmId(), shm);
      }
    }
  }
}
项目:hadoop    文件:DfsClientShm.java   
DfsClientShm(ShmId shmId, FileInputStream stream, EndpointShmManager manager,
    DomainPeer peer) throws IOException {
  super(shmId, stream);
  this.manager = manager;
  this.peer = peer;
}
项目:hadoop    文件:DfsClientShm.java   
public DomainPeer getPeer() {
  return peer;
}
项目:hadoop    文件:TestShortCircuitCache.java   
private static DomainPeer getDomainPeerToDn(Configuration conf)
    throws IOException {
  DomainSocket sock =
      DomainSocket.connect(conf.get(DFS_DOMAIN_SOCKET_PATH_KEY));
  return new DomainPeer(sock);
}
项目:aliyun-oss-hadoop-fs    文件:BlockReaderFactory.java   
/**
 * Get a RemoteBlockReader that communicates over a UNIX domain socket.
 *
 * @return The new BlockReader, or null if we failed to create the block
 * reader.
 *
 * @throws InvalidToken    If the block token was invalid.
 * Potentially other security-related execptions.
 */
private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
  if (pathInfo == null) {
    pathInfo = clientContext.getDomainSocketFactory()
        .getPathInfo(inetSocketAddress, conf.getShortCircuitConf());
  }
  if (!pathInfo.getPathState().getUsableForDataTransfer()) {
    PerformanceAdvisory.LOG.debug("{}: not trying to create a " +
        "remote block reader because the UNIX domain socket at {}" +
         " is not usable.", this, pathInfo);
    return null;
  }
  LOG.trace("{}: trying to create a remote block reader from the UNIX domain "
      + "socket at {}", this, pathInfo.getPath());

  while (true) {
    BlockReaderPeer curPeer = nextDomainPeer();
    if (curPeer == null) break;
    if (curPeer.fromCache) remainingCacheTries--;
    DomainPeer peer = (DomainPeer)curPeer.peer;
    BlockReader blockReader = null;
    try {
      blockReader = getRemoteBlockReader(peer);
      return blockReader;
    } catch (IOException ioe) {
      IOUtilsClient.cleanup(LOG, peer);
      if (isSecurityException(ioe)) {
        LOG.trace("{}: got security exception while constructing a remote "
                + " block reader from the unix domain socket at {}",
            this, pathInfo.getPath(), ioe);
        throw ioe;
      }
      if (curPeer.fromCache) {
        // Handle an I/O error we got when using a cached peer.  These are
        // considered less serious because the underlying socket may be stale.
        LOG.debug("Closed potentially stale domain peer {}", peer, ioe);
      } else {
        // Handle an I/O error we got when using a newly created domain peer.
        // We temporarily disable the domain socket path for a few minutes in
        // this case, to prevent wasting more time on it.
        LOG.warn("I/O error constructing remote block reader.  Disabling " +
            "domain socket " + peer.getDomainSocket(), ioe);
        clientContext.getDomainSocketFactory()
            .disableDomainSocketPath(pathInfo.getPath());
        return null;
      }
    } finally {
      if (blockReader == null) {
        IOUtilsClient.cleanup(LOG, peer);
      }
    }
  }
  return null;
}
项目:aliyun-oss-hadoop-fs    文件:DfsClientShmManager.java   
/**
 * Ask the DataNode for a new shared memory segment.  This function must be
 * called with the manager lock held.  We will release the lock while
 * communicating with the DataNode.
 *
 * @param clientName    The current client name.
 * @param peer          The peer to use to talk to the DataNode.
 *
 * @return              Null if the DataNode does not support shared memory
 *                        segments, or experienced an error creating the
 *                        shm.  The shared memory segment itself on success.
 * @throws IOException  If there was an error communicating over the socket.
 *                        We will not throw an IOException unless the socket
 *                        itself (or the network) is the problem.
 */
private DfsClientShm requestNewShm(String clientName, DomainPeer peer)
    throws IOException {
  final DataOutputStream out =
      new DataOutputStream(
          new BufferedOutputStream(peer.getOutputStream()));
  new Sender(out).requestShortCircuitShm(clientName);
  ShortCircuitShmResponseProto resp =
      ShortCircuitShmResponseProto.parseFrom(
        PBHelperClient.vintPrefixed(peer.getInputStream()));
  String error = resp.hasError() ? resp.getError() : "(unknown)";
  switch (resp.getStatus()) {
  case SUCCESS:
    DomainSocket sock = peer.getDomainSocket();
    byte buf[] = new byte[1];
    FileInputStream fis[] = new FileInputStream[1];
    if (sock.recvFileInputStreams(fis, buf, 0, buf.length) < 0) {
      throw new EOFException("got EOF while trying to transfer the " +
          "file descriptor for the shared memory segment.");
    }
    if (fis[0] == null) {
      throw new IOException("the datanode " + datanode + " failed to " +
          "pass a file descriptor for the shared memory segment.");
    }
    try {
      DfsClientShm shm =
          new DfsClientShm(PBHelperClient.convert(resp.getId()),
              fis[0], this, peer);
      LOG.trace("{}: createNewShm: created {}", this, shm);
      return shm;
    } finally {
      try {
        fis[0].close();
      } catch (Throwable e) {
        LOG.debug("Exception in closing " + fis[0], e);
      }
    }
  case ERROR_UNSUPPORTED:
    // The DataNode just does not support short-circuit shared memory
    // access, and we should stop asking.
    LOG.info(this + ": datanode does not support short-circuit " +
        "shared memory access: " + error);
    disabled = true;
    return null;
  default:
    // The datanode experienced some kind of unexpected error when trying to
    // create the short-circuit shared memory segment.
    LOG.warn(this + ": error requesting short-circuit shared memory " +
        "access: " + error);
    return null;
  }
}
项目:aliyun-oss-hadoop-fs    文件:DfsClientShmManager.java   
/**
 * Allocate a new shared memory slot connected to this datanode.
 *
 * Must be called with the EndpointShmManager lock held.
 *
 * @param peer          The peer to use to talk to the DataNode.
 * @param usedPeer      (out param) Will be set to true if we used the peer.
 *                        When a peer is used
 *
 * @param clientName    The client name.
 * @param blockId       The block ID to use.
 * @return              null if the DataNode does not support shared memory
 *                        segments, or experienced an error creating the
 *                        shm.  The shared memory segment itself on success.
 * @throws IOException  If there was an error communicating over the socket.
 */
Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer,
    String clientName, ExtendedBlockId blockId) throws IOException {
  while (true) {
    if (closed) {
      LOG.trace("{}: the DfsClientShmManager has been closed.", this);
      return null;
    }
    if (disabled) {
      LOG.trace("{}: shared memory segment access is disabled.", this);
      return null;
    }
    // Try to use an existing slot.
    Slot slot = allocSlotFromExistingShm(blockId);
    if (slot != null) {
      return slot;
    }
    // There are no free slots.  If someone is loading more slots, wait
    // for that to finish.
    if (loading) {
      LOG.trace("{}: waiting for loading to finish...", this);
      finishedLoading.awaitUninterruptibly();
    } else {
      // Otherwise, load the slot ourselves.
      loading = true;
      lock.unlock();
      DfsClientShm shm;
      try {
        shm = requestNewShm(clientName, peer);
        if (shm == null) continue;
        // See #{DfsClientShmManager#domainSocketWatcher} for details
        // about why we do this before retaking the manager lock.
        domainSocketWatcher.add(peer.getDomainSocket(), shm);
        // The DomainPeer is now our responsibility, and should not be
        // closed by the caller.
        usedPeer.setValue(true);
      } finally {
        lock.lock();
        loading = false;
        finishedLoading.signalAll();
      }
      if (shm.isDisconnected()) {
        // If the peer closed immediately after the shared memory segment
        // was created, the DomainSocketWatcher callback might already have
        // fired and marked the shm as disconnected.  In this case, we
        // obviously don't want to add the SharedMemorySegment to our list
        // of valid not-full segments.
        LOG.debug("{}: the UNIX domain socket associated with this "
            + "short-circuit memory closed before we could make use of "
            + "the shm.", this);
      } else {
        notFull.put(shm.getShmId(), shm);
      }
    }
  }
}
项目:aliyun-oss-hadoop-fs    文件:DfsClientShm.java   
DfsClientShm(ShmId shmId, FileInputStream stream, EndpointShmManager manager,
    DomainPeer peer) throws IOException {
  super(shmId, stream);
  this.manager = manager;
  this.peer = peer;
}
项目:aliyun-oss-hadoop-fs    文件:DfsClientShm.java   
public DomainPeer getPeer() {
  return peer;
}
项目:aliyun-oss-hadoop-fs    文件:TestShortCircuitCache.java   
private static DomainPeer getDomainPeerToDn(Configuration conf)
    throws IOException {
  DomainSocket sock =
      DomainSocket.connect(conf.get(DFS_DOMAIN_SOCKET_PATH_KEY));
  return new DomainPeer(sock);
}
项目:big-c    文件:BlockReaderFactory.java   
/**
 * Get a RemoteBlockReader that communicates over a UNIX domain socket.
 *
 * @return The new BlockReader, or null if we failed to create the block
 * reader.
 *
 * @throws InvalidToken    If the block token was invalid.
 * Potentially other security-related execptions.
 */
private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
  if (pathInfo == null) {
    pathInfo = clientContext.getDomainSocketFactory().
                    getPathInfo(inetSocketAddress, conf);
  }
  if (!pathInfo.getPathState().getUsableForDataTransfer()) {
    PerformanceAdvisory.LOG.debug(this + ": not trying to create a " +
        "remote block reader because the UNIX domain socket at " +
        pathInfo + " is not usable.");
    return null;
  }
  if (LOG.isTraceEnabled()) {
    LOG.trace(this + ": trying to create a remote block reader from the " +
        "UNIX domain socket at " + pathInfo.getPath());
  }

  while (true) {
    BlockReaderPeer curPeer = nextDomainPeer();
    if (curPeer == null) break;
    if (curPeer.fromCache) remainingCacheTries--;
    DomainPeer peer = (DomainPeer)curPeer.peer;
    BlockReader blockReader = null;
    try {
      blockReader = getRemoteBlockReader(peer);
      return blockReader;
    } catch (IOException ioe) {
      IOUtils.cleanup(LOG, peer);
      if (isSecurityException(ioe)) {
        if (LOG.isTraceEnabled()) {
          LOG.trace(this + ": got security exception while constructing " +
              "a remote block reader from the unix domain socket at " +
              pathInfo.getPath(), ioe);
        }
        throw ioe;
      }
      if (curPeer.fromCache) {
        // Handle an I/O error we got when using a cached peer.  These are
        // considered less serious, because the underlying socket may be stale.
        if (LOG.isDebugEnabled()) {
          LOG.debug("Closed potentially stale domain peer " + peer, ioe);
        }
      } else {
        // Handle an I/O error we got when using a newly created domain peer.
        // We temporarily disable the domain socket path for a few minutes in
        // this case, to prevent wasting more time on it.
        LOG.warn("I/O error constructing remote block reader.  Disabling " +
            "domain socket " + peer.getDomainSocket(), ioe);
        clientContext.getDomainSocketFactory()
            .disableDomainSocketPath(pathInfo.getPath());
        return null;
      }
    } finally {
      if (blockReader == null) {
        IOUtils.cleanup(LOG, peer);
      }
    }
  }
  return null;
}
项目:big-c    文件:DfsClientShmManager.java   
/**
 * Ask the DataNode for a new shared memory segment.  This function must be
 * called with the manager lock held.  We will release the lock while
 * communicating with the DataNode.
 *
 * @param clientName    The current client name.
 * @param peer          The peer to use to talk to the DataNode.
 *
 * @return              Null if the DataNode does not support shared memory
 *                        segments, or experienced an error creating the
 *                        shm.  The shared memory segment itself on success.
 * @throws IOException  If there was an error communicating over the socket.
 *                        We will not throw an IOException unless the socket
 *                        itself (or the network) is the problem.
 */
private DfsClientShm requestNewShm(String clientName, DomainPeer peer)
    throws IOException {
  final DataOutputStream out = 
      new DataOutputStream(
          new BufferedOutputStream(peer.getOutputStream()));
  new Sender(out).requestShortCircuitShm(clientName);
  ShortCircuitShmResponseProto resp = 
      ShortCircuitShmResponseProto.parseFrom(
          PBHelper.vintPrefixed(peer.getInputStream()));
  String error = resp.hasError() ? resp.getError() : "(unknown)";
  switch (resp.getStatus()) {
  case SUCCESS:
    DomainSocket sock = peer.getDomainSocket();
    byte buf[] = new byte[1];
    FileInputStream fis[] = new FileInputStream[1];
    if (sock.recvFileInputStreams(fis, buf, 0, buf.length) < 0) {
      throw new EOFException("got EOF while trying to transfer the " +
          "file descriptor for the shared memory segment.");
    }
    if (fis[0] == null) {
      throw new IOException("the datanode " + datanode + " failed to " +
          "pass a file descriptor for the shared memory segment.");
    }
    try {
      DfsClientShm shm = 
          new DfsClientShm(PBHelper.convert(resp.getId()),
              fis[0], this, peer);
      if (LOG.isTraceEnabled()) {
        LOG.trace(this + ": createNewShm: created " + shm);
      }
      return shm;
    } finally {
      IOUtils.cleanup(LOG,  fis[0]);
    }
  case ERROR_UNSUPPORTED:
    // The DataNode just does not support short-circuit shared memory
    // access, and we should stop asking.
    LOG.info(this + ": datanode does not support short-circuit " +
        "shared memory access: " + error);
    disabled = true;
    return null;
  default:
    // The datanode experienced some kind of unexpected error when trying to
    // create the short-circuit shared memory segment.
    LOG.warn(this + ": error requesting short-circuit shared memory " +
        "access: " + error);
    return null;
  }
}
项目:big-c    文件:DfsClientShmManager.java   
/**
 * Allocate a new shared memory slot connected to this datanode.
 *
 * Must be called with the EndpointShmManager lock held.
 *
 * @param peer          The peer to use to talk to the DataNode.
 * @param usedPeer      (out param) Will be set to true if we used the peer.
 *                        When a peer is used
 *
 * @param clientName    The client name.
 * @param blockId       The block ID to use.
 * @return              null if the DataNode does not support shared memory
 *                        segments, or experienced an error creating the
 *                        shm.  The shared memory segment itself on success.
 * @throws IOException  If there was an error communicating over the socket.
 */
Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer,
    String clientName, ExtendedBlockId blockId) throws IOException {
  while (true) {
    if (closed) {
      if (LOG.isTraceEnabled()) {
        LOG.trace(this + ": the DfsClientShmManager has been closed.");
      }
      return null;
    }
    if (disabled) {
      if (LOG.isTraceEnabled()) {
        LOG.trace(this + ": shared memory segment access is disabled.");
      }
      return null;
    }
    // Try to use an existing slot.
    Slot slot = allocSlotFromExistingShm(blockId);
    if (slot != null) {
      return slot;
    }
    // There are no free slots.  If someone is loading more slots, wait
    // for that to finish.
    if (loading) {
      if (LOG.isTraceEnabled()) {
        LOG.trace(this + ": waiting for loading to finish...");
      }
      finishedLoading.awaitUninterruptibly();
    } else {
      // Otherwise, load the slot ourselves.
      loading = true;
      lock.unlock();
      DfsClientShm shm;
      try {
        shm = requestNewShm(clientName, peer);
        if (shm == null) continue;
        // See #{DfsClientShmManager#domainSocketWatcher} for details
        // about why we do this before retaking the manager lock.
        domainSocketWatcher.add(peer.getDomainSocket(), shm);
        // The DomainPeer is now our responsibility, and should not be
        // closed by the caller.
        usedPeer.setValue(true);
      } finally {
        lock.lock();
        loading = false;
        finishedLoading.signalAll();
      }
      if (shm.isDisconnected()) {
        // If the peer closed immediately after the shared memory segment
        // was created, the DomainSocketWatcher callback might already have
        // fired and marked the shm as disconnected.  In this case, we
        // obviously don't want to add the SharedMemorySegment to our list
        // of valid not-full segments.
        if (LOG.isDebugEnabled()) {
          LOG.debug(this + ": the UNIX domain socket associated with " +
              "this short-circuit memory closed before we could make " +
              "use of the shm.");
        }
      } else {
        notFull.put(shm.getShmId(), shm);
      }
    }
  }
}
项目:big-c    文件:DfsClientShm.java   
DfsClientShm(ShmId shmId, FileInputStream stream, EndpointShmManager manager,
    DomainPeer peer) throws IOException {
  super(shmId, stream);
  this.manager = manager;
  this.peer = peer;
}
项目:big-c    文件:DfsClientShm.java   
public DomainPeer getPeer() {
  return peer;
}
项目:big-c    文件:TestShortCircuitCache.java   
private static DomainPeer getDomainPeerToDn(Configuration conf)
    throws IOException {
  DomainSocket sock =
      DomainSocket.connect(conf.get(DFS_DOMAIN_SOCKET_PATH_KEY));
  return new DomainPeer(sock);
}
项目:hadoop-2.6.0-cdh5.4.3    文件:BlockReaderFactory.java   
/**
 * Get a RemoteBlockReader that communicates over a UNIX domain socket.
 *
 * @return The new BlockReader, or null if we failed to create the block
 * reader.
 *
 * @throws InvalidToken    If the block token was invalid.
 * Potentially other security-related execptions.
 */
private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
  if (pathInfo == null) {
    pathInfo = clientContext.getDomainSocketFactory().
                    getPathInfo(inetSocketAddress, conf);
  }
  if (!pathInfo.getPathState().getUsableForDataTransfer()) {
    PerformanceAdvisory.LOG.debug(this + ": not trying to create a " +
        "remote block reader because the UNIX domain socket at " +
        pathInfo + " is not usable.");
    return null;
  }
  if (LOG.isTraceEnabled()) {
    LOG.trace(this + ": trying to create a remote block reader from the " +
        "UNIX domain socket at " + pathInfo.getPath());
  }

  while (true) {
    BlockReaderPeer curPeer = nextDomainPeer();
    if (curPeer == null) break;
    if (curPeer.fromCache) remainingCacheTries--;
    DomainPeer peer = (DomainPeer)curPeer.peer;
    BlockReader blockReader = null;
    try {
      blockReader = getRemoteBlockReader(peer);
      return blockReader;
    } catch (IOException ioe) {
      IOUtils.cleanup(LOG, peer);
      if (isSecurityException(ioe)) {
        if (LOG.isTraceEnabled()) {
          LOG.trace(this + ": got security exception while constructing " +
              "a remote block reader from the unix domain socket at " +
              pathInfo.getPath(), ioe);
        }
        throw ioe;
      }
      if (curPeer.fromCache) {
        // Handle an I/O error we got when using a cached peer.  These are
        // considered less serious, because the underlying socket may be stale.
        if (LOG.isDebugEnabled()) {
          LOG.debug("Closed potentially stale domain peer " + peer, ioe);
        }
      } else {
        // Handle an I/O error we got when using a newly created domain peer.
        // We temporarily disable the domain socket path for a few minutes in
        // this case, to prevent wasting more time on it.
        LOG.warn("I/O error constructing remote block reader.  Disabling " +
            "domain socket " + peer.getDomainSocket(), ioe);
        clientContext.getDomainSocketFactory()
            .disableDomainSocketPath(pathInfo.getPath());
        return null;
      }
    } finally {
      if (blockReader == null) {
        IOUtils.cleanup(LOG, peer);
      }
    }
  }
  return null;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:DfsClientShmManager.java   
/**
 * Ask the DataNode for a new shared memory segment.  This function must be
 * called with the manager lock held.  We will release the lock while
 * communicating with the DataNode.
 *
 * @param clientName    The current client name.
 * @param peer          The peer to use to talk to the DataNode.
 *
 * @return              Null if the DataNode does not support shared memory
 *                        segments, or experienced an error creating the
 *                        shm.  The shared memory segment itself on success.
 * @throws IOException  If there was an error communicating over the socket.
 *                        We will not throw an IOException unless the socket
 *                        itself (or the network) is the problem.
 */
private DfsClientShm requestNewShm(String clientName, DomainPeer peer)
    throws IOException {
  final DataOutputStream out = 
      new DataOutputStream(
          new BufferedOutputStream(peer.getOutputStream()));
  new Sender(out).requestShortCircuitShm(clientName);
  ShortCircuitShmResponseProto resp = 
      ShortCircuitShmResponseProto.parseFrom(
          PBHelper.vintPrefixed(peer.getInputStream()));
  String error = resp.hasError() ? resp.getError() : "(unknown)";
  switch (resp.getStatus()) {
  case SUCCESS:
    DomainSocket sock = peer.getDomainSocket();
    byte buf[] = new byte[1];
    FileInputStream fis[] = new FileInputStream[1];
    if (sock.recvFileInputStreams(fis, buf, 0, buf.length) < 0) {
      throw new EOFException("got EOF while trying to transfer the " +
          "file descriptor for the shared memory segment.");
    }
    if (fis[0] == null) {
      throw new IOException("the datanode " + datanode + " failed to " +
          "pass a file descriptor for the shared memory segment.");
    }
    try {
      DfsClientShm shm = 
          new DfsClientShm(PBHelper.convert(resp.getId()),
              fis[0], this, peer);
      if (LOG.isTraceEnabled()) {
        LOG.trace(this + ": createNewShm: created " + shm);
      }
      return shm;
    } finally {
      IOUtils.cleanup(LOG,  fis[0]);
    }
  case ERROR_UNSUPPORTED:
    // The DataNode just does not support short-circuit shared memory
    // access, and we should stop asking.
    LOG.info(this + ": datanode does not support short-circuit " +
        "shared memory access: " + error);
    disabled = true;
    return null;
  default:
    // The datanode experienced some kind of unexpected error when trying to
    // create the short-circuit shared memory segment.
    LOG.warn(this + ": error requesting short-circuit shared memory " +
        "access: " + error);
    return null;
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:DfsClientShmManager.java   
/**
 * Allocate a new shared memory slot connected to this datanode.
 *
 * Must be called with the EndpointShmManager lock held.
 *
 * @param peer          The peer to use to talk to the DataNode.
 * @param usedPeer      (out param) Will be set to true if we used the peer.
 *                        When a peer is used
 *
 * @param clientName    The client name.
 * @param blockId       The block ID to use.
 * @return              null if the DataNode does not support shared memory
 *                        segments, or experienced an error creating the
 *                        shm.  The shared memory segment itself on success.
 * @throws IOException  If there was an error communicating over the socket.
 */
Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer,
    String clientName, ExtendedBlockId blockId) throws IOException {
  while (true) {
    if (closed) {
      if (LOG.isTraceEnabled()) {
        LOG.trace(this + ": the DfsClientShmManager has been closed.");
      }
      return null;
    }
    if (disabled) {
      if (LOG.isTraceEnabled()) {
        LOG.trace(this + ": shared memory segment access is disabled.");
      }
      return null;
    }
    // Try to use an existing slot.
    Slot slot = allocSlotFromExistingShm(blockId);
    if (slot != null) {
      return slot;
    }
    // There are no free slots.  If someone is loading more slots, wait
    // for that to finish.
    if (loading) {
      if (LOG.isTraceEnabled()) {
        LOG.trace(this + ": waiting for loading to finish...");
      }
      finishedLoading.awaitUninterruptibly();
    } else {
      // Otherwise, load the slot ourselves.
      loading = true;
      lock.unlock();
      DfsClientShm shm;
      try {
        shm = requestNewShm(clientName, peer);
        if (shm == null) continue;
        // See #{DfsClientShmManager#domainSocketWatcher} for details
        // about why we do this before retaking the manager lock.
        domainSocketWatcher.add(peer.getDomainSocket(), shm);
        // The DomainPeer is now our responsibility, and should not be
        // closed by the caller.
        usedPeer.setValue(true);
      } finally {
        lock.lock();
        loading = false;
        finishedLoading.signalAll();
      }
      if (shm.isDisconnected()) {
        // If the peer closed immediately after the shared memory segment
        // was created, the DomainSocketWatcher callback might already have
        // fired and marked the shm as disconnected.  In this case, we
        // obviously don't want to add the SharedMemorySegment to our list
        // of valid not-full segments.
        if (LOG.isDebugEnabled()) {
          LOG.debug(this + ": the UNIX domain socket associated with " +
              "this short-circuit memory closed before we could make " +
              "use of the shm.");
        }
      } else {
        notFull.put(shm.getShmId(), shm);
      }
    }
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:DfsClientShm.java   
DfsClientShm(ShmId shmId, FileInputStream stream, EndpointShmManager manager,
    DomainPeer peer) throws IOException {
  super(shmId, stream);
  this.manager = manager;
  this.peer = peer;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:DfsClientShm.java   
public DomainPeer getPeer() {
  return peer;
}
项目:hadoop-2.6.0-cdh5.4.3    文件:TestShortCircuitCache.java   
private static DomainPeer getDomainPeerToDn(Configuration conf)
    throws IOException {
  DomainSocket sock =
      DomainSocket.connect(conf.get(DFS_DOMAIN_SOCKET_PATH_KEY));
  return new DomainPeer(sock);
}
项目:FlexMap    文件:BlockReaderFactory.java   
/**
 * Get a RemoteBlockReader that communicates over a UNIX domain socket.
 *
 * @return The new BlockReader, or null if we failed to create the block
 * reader.
 *
 * @throws InvalidToken    If the block token was invalid.
 * Potentially other security-related execptions.
 */
private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
  if (pathInfo == null) {
    pathInfo = clientContext.getDomainSocketFactory().
                    getPathInfo(inetSocketAddress, conf);
  }
  if (!pathInfo.getPathState().getUsableForDataTransfer()) {
    PerformanceAdvisory.LOG.debug(this + ": not trying to create a " +
        "remote block reader because the UNIX domain socket at " +
        pathInfo + " is not usable.");
    return null;
  }
  if (LOG.isTraceEnabled()) {
    LOG.trace(this + ": trying to create a remote block reader from the " +
        "UNIX domain socket at " + pathInfo.getPath());
  }

  while (true) {
    BlockReaderPeer curPeer = nextDomainPeer();
    if (curPeer == null) break;
    if (curPeer.fromCache) remainingCacheTries--;
    DomainPeer peer = (DomainPeer)curPeer.peer;
    BlockReader blockReader = null;
    try {
      blockReader = getRemoteBlockReader(peer);
      return blockReader;
    } catch (IOException ioe) {
      IOUtils.cleanup(LOG, peer);
      if (isSecurityException(ioe)) {
        if (LOG.isTraceEnabled()) {
          LOG.trace(this + ": got security exception while constructing " +
              "a remote block reader from the unix domain socket at " +
              pathInfo.getPath(), ioe);
        }
        throw ioe;
      }
      if (curPeer.fromCache) {
        // Handle an I/O error we got when using a cached peer.  These are
        // considered less serious, because the underlying socket may be stale.
        if (LOG.isDebugEnabled()) {
          LOG.debug("Closed potentially stale domain peer " + peer, ioe);
        }
      } else {
        // Handle an I/O error we got when using a newly created domain peer.
        // We temporarily disable the domain socket path for a few minutes in
        // this case, to prevent wasting more time on it.
        LOG.warn("I/O error constructing remote block reader.  Disabling " +
            "domain socket " + peer.getDomainSocket(), ioe);
        clientContext.getDomainSocketFactory()
            .disableDomainSocketPath(pathInfo.getPath());
        return null;
      }
    } finally {
      if (blockReader == null) {
        IOUtils.cleanup(LOG, peer);
      }
    }
  }
  return null;
}
项目:FlexMap    文件:DfsClientShmManager.java   
/**
 * Ask the DataNode for a new shared memory segment.  This function must be
 * called with the manager lock held.  We will release the lock while
 * communicating with the DataNode.
 *
 * @param clientName    The current client name.
 * @param peer          The peer to use to talk to the DataNode.
 *
 * @return              Null if the DataNode does not support shared memory
 *                        segments, or experienced an error creating the
 *                        shm.  The shared memory segment itself on success.
 * @throws IOException  If there was an error communicating over the socket.
 *                        We will not throw an IOException unless the socket
 *                        itself (or the network) is the problem.
 */
private DfsClientShm requestNewShm(String clientName, DomainPeer peer)
    throws IOException {
  final DataOutputStream out = 
      new DataOutputStream(
          new BufferedOutputStream(peer.getOutputStream()));
  new Sender(out).requestShortCircuitShm(clientName);
  ShortCircuitShmResponseProto resp = 
      ShortCircuitShmResponseProto.parseFrom(
          PBHelper.vintPrefixed(peer.getInputStream()));
  String error = resp.hasError() ? resp.getError() : "(unknown)";
  switch (resp.getStatus()) {
  case SUCCESS:
    DomainSocket sock = peer.getDomainSocket();
    byte buf[] = new byte[1];
    FileInputStream fis[] = new FileInputStream[1];
    if (sock.recvFileInputStreams(fis, buf, 0, buf.length) < 0) {
      throw new EOFException("got EOF while trying to transfer the " +
          "file descriptor for the shared memory segment.");
    }
    if (fis[0] == null) {
      throw new IOException("the datanode " + datanode + " failed to " +
          "pass a file descriptor for the shared memory segment.");
    }
    try {
      DfsClientShm shm = 
          new DfsClientShm(PBHelper.convert(resp.getId()),
              fis[0], this, peer);
      if (LOG.isTraceEnabled()) {
        LOG.trace(this + ": createNewShm: created " + shm);
      }
      return shm;
    } finally {
      IOUtils.cleanup(LOG,  fis[0]);
    }
  case ERROR_UNSUPPORTED:
    // The DataNode just does not support short-circuit shared memory
    // access, and we should stop asking.
    LOG.info(this + ": datanode does not support short-circuit " +
        "shared memory access: " + error);
    disabled = true;
    return null;
  default:
    // The datanode experienced some kind of unexpected error when trying to
    // create the short-circuit shared memory segment.
    LOG.warn(this + ": error requesting short-circuit shared memory " +
        "access: " + error);
    return null;
  }
}
项目:FlexMap    文件:DfsClientShmManager.java   
/**
 * Allocate a new shared memory slot connected to this datanode.
 *
 * Must be called with the EndpointShmManager lock held.
 *
 * @param peer          The peer to use to talk to the DataNode.
 * @param clientName    The client name.
 * @param usedPeer      (out param) Will be set to true if we used the peer.
 *                        When a peer is used
 *
 * @return              null if the DataNode does not support shared memory
 *                        segments, or experienced an error creating the
 *                        shm.  The shared memory segment itself on success.
 * @throws IOException  If there was an error communicating over the socket.
 */
Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer,
    String clientName, ExtendedBlockId blockId) throws IOException {
  while (true) {
    if (closed) {
      if (LOG.isTraceEnabled()) {
        LOG.trace(this + ": the DfsClientShmManager has been closed.");
      }
      return null;
    }
    if (disabled) {
      if (LOG.isTraceEnabled()) {
        LOG.trace(this + ": shared memory segment access is disabled.");
      }
      return null;
    }
    // Try to use an existing slot.
    Slot slot = allocSlotFromExistingShm(blockId);
    if (slot != null) {
      return slot;
    }
    // There are no free slots.  If someone is loading more slots, wait
    // for that to finish.
    if (loading) {
      if (LOG.isTraceEnabled()) {
        LOG.trace(this + ": waiting for loading to finish...");
      }
      finishedLoading.awaitUninterruptibly();
    } else {
      // Otherwise, load the slot ourselves.
      loading = true;
      lock.unlock();
      DfsClientShm shm;
      try {
        shm = requestNewShm(clientName, peer);
        if (shm == null) continue;
        // See #{DfsClientShmManager#domainSocketWatcher} for details
        // about why we do this before retaking the manager lock.
        domainSocketWatcher.add(peer.getDomainSocket(), shm);
        // The DomainPeer is now our responsibility, and should not be
        // closed by the caller.
        usedPeer.setValue(true);
      } finally {
        lock.lock();
        loading = false;
        finishedLoading.signalAll();
      }
      if (shm.isDisconnected()) {
        // If the peer closed immediately after the shared memory segment
        // was created, the DomainSocketWatcher callback might already have
        // fired and marked the shm as disconnected.  In this case, we
        // obviously don't want to add the SharedMemorySegment to our list
        // of valid not-full segments.
        if (LOG.isDebugEnabled()) {
          LOG.debug(this + ": the UNIX domain socket associated with " +
              "this short-circuit memory closed before we could make " +
              "use of the shm.");
        }
      } else {
        notFull.put(shm.getShmId(), shm);
      }
    }
  }
}
项目:FlexMap    文件:DfsClientShm.java   
DfsClientShm(ShmId shmId, FileInputStream stream, EndpointShmManager manager,
    DomainPeer peer) throws IOException {
  super(shmId, stream);
  this.manager = manager;
  this.peer = peer;
}