static BaseHeaderProto buildBaseHeader(ExtendedBlock blk, Token<BlockTokenIdentifier> blockToken) { BaseHeaderProto.Builder builder = BaseHeaderProto.newBuilder() .setBlock(PBHelper.convert(blk)) .setToken(PBHelper.convert(blockToken)); if (Trace.isTracing()) { Span s = Trace.currentSpan(); builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() .setTraceId(s.getTraceId()) .setParentId(s.getSpanId())); } return builder.build(); }
static BaseHeaderProto buildBaseHeader(ExtendedBlock blk, Token<BlockTokenIdentifier> blockToken) { BaseHeaderProto.Builder builder = BaseHeaderProto.newBuilder() .setBlock(PBHelperClient.convert(blk)) .setToken(PBHelperClient.convert(blockToken)); SpanId spanId = Tracer.getCurrentSpanId(); if (spanId.isValid()) { builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() .setTraceId(spanId.getHigh()) .setParentId(spanId.getLow())); } return builder.build(); }
static BaseHeaderProto buildBaseHeader(ExtendedBlock blk, Token<BlockTokenIdentifier> blockToken) { return BaseHeaderProto.newBuilder() .setBlock(PBHelper.convert(blk)) .setToken(PBHelper.convert(blockToken)) .build(); }
public static TraceScope continueTraceSpan(BaseHeaderProto header, String description) { return continueTraceSpan(header.getTraceInfo(), description); }
private TraceScope continueTraceSpan(BaseHeaderProto header, String description) { return continueTraceSpan(header.getTraceInfo(), description); }
static BaseHeaderProto buildBaseHeader(ExtendedBlock blk, Token<BlockTokenIdentifier> blockToken) { return BaseHeaderProto.newBuilder().setBlock(PBHelper.convert(blk)) .setToken(PBHelper.convert(blockToken)).build(); }
private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client, String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS, BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) { Enum<?>[] storageTypes = locatedBlock.getStorageTypes(); DatanodeInfo[] datanodeInfos = locatedBlock.getLocations(); boolean connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT); ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock()); blockCopy.setNumBytes(locatedBlock.getBlockSize()); ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder() .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PB_HELPER.convert(blockCopy)) .setToken(PB_HELPER.convert(locatedBlock.getBlockToken()))) .setClientName(clientName).build(); ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer); OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder() .setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name())) .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes()) .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS) .setRequestedChecksum(checksumProto) .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build()); List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length); for (int i = 0; i < datanodeInfos.length; i++) { DatanodeInfo dnInfo = datanodeInfos[i]; Enum<?> storageType = storageTypes[i]; Promise<Channel> promise = eventLoopGroup.next().newPromise(); futureList.add(promise); String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname); new Bootstrap().group(eventLoopGroup).channel(channelClass) .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { // we need to get the remote address of the channel so we can only move on after // channel connected. Leave an empty implementation here because netty does not allow // a null handler. } }).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder, timeoutMs, client, locatedBlock.getBlockToken(), promise); } else { promise.tryFailure(future.cause()); } } }); } return futureList; }