/** * Sends an OPEN RPC to the specified server to open the specified region. * <p> * Open should not fail but can if server just crashed. * <p> * @param server server to open a region * @param region region to open * @param versionOfOfflineNode that needs to be present in the offline node * when RS tries to change the state from OFFLINE to other states. * @param favoredNodes */ public RegionOpeningState sendRegionOpen(final ServerName server, HRegionInfo region, int versionOfOfflineNode, List<ServerName> favoredNodes) throws IOException { AdminService.BlockingInterface admin = getRsAdmin(server); if (admin == null) { LOG.warn("Attempting to send OPEN RPC to server " + server.toString() + " failed because no RPC connection found to this server"); return RegionOpeningState.FAILED_OPENING; } OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server, region, versionOfOfflineNode, favoredNodes, (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode())); try { OpenRegionResponse response = admin.openRegion(null, request); return ResponseConverter.getRegionOpeningState(response); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } }
/** * Sends an OPEN RPC to the specified server to open the specified region. * <p> * Open should not fail but can if server just crashed. * <p> * @param server server to open a region * @param regionOpenInfos info of a list of regions to open * @return a list of region opening states */ public List<RegionOpeningState> sendRegionOpen(ServerName server, List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos) throws IOException { AdminService.BlockingInterface admin = getRsAdmin(server); if (admin == null) { LOG.warn("Attempting to send OPEN RPC to server " + server.toString() + " failed because no RPC connection found to this server"); return null; } OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server, regionOpenInfos, (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode())); try { OpenRegionResponse response = admin.openRegion(null, request); return ResponseConverter.getRegionOpeningStateList(response); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } }
/** * Sends an MERGE REGIONS RPC to the specified server to merge the specified * regions. * <p> * A region server could reject the close request because it either does not * have the specified region. * @param server server to merge regions * @param region_a region to merge * @param region_b region to merge * @param forcible true if do a compulsory merge, otherwise we will only merge * two adjacent regions * @throws IOException */ public void sendRegionsMerge(ServerName server, HRegionInfo region_a, HRegionInfo region_b, boolean forcible) throws IOException { if (server == null) throw new NullPointerException("Passed server is null"); if (region_a == null || region_b == null) throw new NullPointerException("Passed region is null"); AdminService.BlockingInterface admin = getRsAdmin(server); if (admin == null) { throw new IOException("Attempting to send MERGE REGIONS RPC to server " + server.toString() + " for region " + region_a.getRegionNameAsString() + "," + region_b.getRegionNameAsString() + " failed because no RPC connection found to this server"); } PayloadCarryingRpcController controller = newRpcController(); ProtobufUtil.mergeRegions(controller, admin, region_a, region_b, forcible); }
/** * @param sn * @return Admin interface for the remote regionserver named <code>sn</code> * @throws IOException * @throws RetriesExhaustedException wrapping a ConnectException if failed */ private AdminService.BlockingInterface getRsAdmin(final ServerName sn) throws IOException { AdminService.BlockingInterface admin = this.rsAdmins.get(sn); if (admin == null) { LOG.debug("New admin connection to " + sn.toString()); if (sn.equals(master.getServerName()) && master instanceof HRegionServer) { // A master is also a region server now, see HBASE-10569 for details admin = ((HRegionServer)master).getRSRpcServices(); } else { admin = this.connection.getAdmin(sn); } this.rsAdmins.put(sn, admin); } return admin; }
private void replayToServer(HRegionInfo regionInfo, List<Entry> entries) throws IOException, ServiceException { if (entries.isEmpty()) return; Entry[] entriesArray = new Entry[entries.size()]; entriesArray = entries.toArray(entriesArray); AdminService.BlockingInterface remoteSvr = conn.getAdmin(getLocation().getServerName()); Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray); PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond()); try { remoteSvr.replay(controller, p.getFirst()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } }
@Test public void testReportBadSink() { ServerName serverNameA = mock(ServerName.class); ServerName serverNameB = mock(ServerName.class); when(replicationEndpoint.getRegionServers()) .thenReturn(Lists.newArrayList(serverNameA, serverNameB)); sinkManager.chooseSinks(); // Sanity check assertEquals(1, sinkManager.getNumSinks()); SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class)); sinkManager.reportBadSink(sinkPeer); // Just reporting a bad sink once shouldn't have an effect assertEquals(1, sinkManager.getNumSinks()); }
/** * A helper to close a region given a region name * using admin protocol. * * @param admin * @param regionName * @param versionOfClosingNode * @return true if the region is closed * @throws IOException */ public static boolean closeRegion(final RpcController controller, final AdminService.BlockingInterface admin, final ServerName server, final byte[] regionName, final int versionOfClosingNode, final ServerName destinationServer, final boolean transitionInZK) throws IOException { CloseRegionRequest closeRegionRequest = RequestConverter.buildCloseRegionRequest(server, regionName, versionOfClosingNode, destinationServer, transitionInZK); try { CloseRegionResponse response = admin.closeRegion(controller, closeRegionRequest); return ResponseConverter.isClosed(response); } catch (ServiceException se) { throw getRemoteException(se); } }
/** * Stop the designated regionserver * @param hostnamePort Hostname and port delimited by a <code>:</code> as in * <code>example.org:1234</code> * @throws IOException if a remote or network exception occurs */ @Override public synchronized void stopRegionServer(final String hostnamePort) throws IOException { String hostname = Addressing.parseHostname(hostnamePort); int port = Addressing.parsePort(hostnamePort); AdminService.BlockingInterface admin = this.connection.getAdmin(ServerName.valueOf(hostname, port, 0)); StopServerRequest request = RequestConverter.buildStopServerRequest( "Called by admin client " + this.connection.toString()); PayloadCarryingRpcController controller = rpcControllerFactory.newController(); controller.setPriority(HConstants.HIGH_QOS); try { // TODO: this does not do retries, it should. Set priority and timeout in controller admin.stopServer(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } }
/** * {@inheritDoc} */ @Override public CompactionState getCompactionStateForRegion(final byte[] regionName) throws IOException { try { Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionName); if (regionServerPair == null) { throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName)); } if (regionServerPair.getSecond() == null) { throw new NoServerForRegionException(Bytes.toStringBinary(regionName)); } ServerName sn = regionServerPair.getSecond(); AdminService.BlockingInterface admin = this.connection.getAdmin(sn); GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest( regionServerPair.getFirst().getRegionName(), true); PayloadCarryingRpcController controller = rpcControllerFactory.newController(); // TODO: this does not do retries, it should. Set priority and timeout in controller GetRegionInfoResponse response = admin.getRegionInfo(controller, request); return response.getCompactionState(); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } }
@Override // Nothing is done w/ the 'master' parameter. It is ignored. public AdminService.BlockingInterface getAdmin(final ServerName serverName, final boolean master) throws IOException { if (isDeadServer(serverName)) { throw new RegionServerStoppedException(serverName + " is dead."); } String key = getStubKey(AdminService.BlockingInterface.class.getName(), serverName.getHostname(), serverName.getPort(), this.hostnamesCanChange); this.connectionLock.putIfAbsent(key, key); AdminService.BlockingInterface stub = null; synchronized (this.connectionLock.get(key)) { stub = (AdminService.BlockingInterface)this.stubs.get(key); if (stub == null) { BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout); stub = AdminService.newBlockingStub(channel); this.stubs.put(key, stub); } } return stub; }
/** * Sends an MERGE REGIONS RPC to the specified server to merge the specified * regions. * <p> * A region server could reject the close request because it either does not * have the specified region. * @param server server to merge regions * @param region_a region to merge * @param region_b region to merge * @param forcible true if do a compulsory merge, otherwise we will only merge * two adjacent regions * @throws IOException */ public void sendRegionsMerge(ServerName server, HRegionInfo region_a, HRegionInfo region_b, boolean forcible) throws IOException { if (server == null) throw new NullPointerException("Passed server is null"); if (region_a == null || region_b == null) throw new NullPointerException("Passed region is null"); AdminService.BlockingInterface admin = getRsAdmin(server); if (admin == null) { throw new IOException("Attempting to send MERGE REGIONS RPC to server " + server.toString() + " for region " + region_a.getRegionNameAsString() + "," + region_b.getRegionNameAsString() + " failed because no RPC connection found to this server"); } ProtobufUtil.mergeRegions(admin, region_a, region_b, forcible); }
/** * Check if a region server is reachable and has the expected start code */ public boolean isServerReachable(ServerName server) { if (server == null) throw new NullPointerException("Passed server is null"); RetryCounter retryCounter = pingRetryCounterFactory.create(); while (retryCounter.shouldRetry()) { try { AdminService.BlockingInterface admin = getRsAdmin(server); if (admin != null) { ServerInfo info = ProtobufUtil.getServerInfo(admin); return info != null && info.hasServerName() && server.getStartcode() == info.getServerName().getStartCode(); } } catch (IOException ioe) { LOG.debug("Couldn't reach " + server + ", try=" + retryCounter.getAttemptTimes() + " of " + retryCounter.getMaxAttempts(), ioe); try { retryCounter.sleepUntilNextRetry(); } catch(InterruptedException ie) { Thread.currentThread().interrupt(); } } } return false; }
@Test public void testReportBadSink() { ServerName serverNameA = mock(ServerName.class); ServerName serverNameB = mock(ServerName.class); when(replicationEndpoint.getRegionServers()) .thenReturn(Lists.newArrayList(serverNameA, serverNameB)); sinkManager.chooseSinks(); // Sanity check assertEquals(1, sinkManager.getSinks().size()); SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class)); sinkManager.reportBadSink(sinkPeer); // Just reporting a bad sink once shouldn't have an effect assertEquals(1, sinkManager.getSinks().size()); }
/** * A helper to close a region given a region name * using admin protocol. * * @param admin * @param regionName * @param versionOfClosingNode * @return true if the region is closed * @throws IOException */ public static boolean closeRegion(final AdminService.BlockingInterface admin, final ServerName server, final byte[] regionName, final int versionOfClosingNode, final ServerName destinationServer, final boolean transitionInZK) throws IOException { CloseRegionRequest closeRegionRequest = RequestConverter.buildCloseRegionRequest(server, regionName, versionOfClosingNode, destinationServer, transitionInZK); try { CloseRegionResponse response = admin.closeRegion(null, closeRegionRequest); return ResponseConverter.isClosed(response); } catch (ServiceException se) { throw getRemoteException(se); } }
/** * For expert-admins. Runs close on the regionserver. Closes a region based on * the encoded region name. The region server name is mandatory. If the * servername is provided then based on the online regions in the specified * regionserver the specified region will be closed. The master will not be * informed of the close. Note that the regionname is the encoded regionname. * * @param encodedRegionName * The encoded region name; i.e. the hash that makes up the region * name suffix: e.g. if regionname is * <code>TestTable,0094429456,1289497600452.527db22f95c8a9e0116f0cc13c680396.</code> * , then the encoded region name is: * <code>527db22f95c8a9e0116f0cc13c680396</code>. * @param serverName * The servername of the regionserver. A server name is made of host, * port and startcode. This is mandatory. Here is an example: * <code> host187.example.com,60020,1289493121758</code> * @return true if the region was closed, false if not. * @throws IOException * if a remote or network exception occurs */ @Override public boolean closeRegionWithEncodedRegionName(final String encodedRegionName, final String serverName) throws IOException { if (null == serverName || ("").equals(serverName.trim())) { throw new IllegalArgumentException( "The servername cannot be null or empty."); } ServerName sn = ServerName.valueOf(serverName); AdminService.BlockingInterface admin = this.connection.getAdmin(sn); // Close the region without updating zk state. CloseRegionRequest request = RequestConverter.buildCloseRegionRequest(sn, encodedRegionName, false); try { CloseRegionResponse response = admin.closeRegion(null, request); boolean isRegionClosed = response.getClosed(); if (false == isRegionClosed) { LOG.error("Not able to close the region " + encodedRegionName + "."); } return isRegionClosed; } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } }
/** * Stop the designated regionserver * @param hostnamePort Hostname and port delimited by a <code>:</code> as in * <code>example.org:1234</code> * @throws IOException if a remote or network exception occurs */ @Override public synchronized void stopRegionServer(final String hostnamePort) throws IOException { String hostname = Addressing.parseHostname(hostnamePort); int port = Addressing.parsePort(hostnamePort); AdminService.BlockingInterface admin = this.connection.getAdmin(ServerName.valueOf(hostname, port, 0)); StopServerRequest request = RequestConverter.buildStopServerRequest( "Called by admin client " + this.connection.toString()); try { admin.stopServer(null, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } }
/** * {@inheritDoc} */ @Override public CompactionState getCompactionStateForRegion(final byte[] regionName) throws IOException { try { Pair<HRegionInfo, ServerName> regionServerPair = getRegion(regionName); if (regionServerPair == null) { throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName)); } if (regionServerPair.getSecond() == null) { throw new NoServerForRegionException(Bytes.toStringBinary(regionName)); } ServerName sn = regionServerPair.getSecond(); AdminService.BlockingInterface admin = this.connection.getAdmin(sn); GetRegionInfoRequest request = RequestConverter.buildGetRegionInfoRequest( regionServerPair.getFirst().getRegionName(), true); GetRegionInfoResponse response = admin.getRegionInfo(null, request); return response.getCompactionState(); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } }
@Override // Nothing is done w/ the 'master' parameter. It is ignored. public AdminService.BlockingInterface getAdmin(final ServerName serverName, final boolean master) throws IOException { if (isDeadServer(serverName)) { throw new RegionServerStoppedException(serverName + " is dead."); } String key = getStubKey(AdminService.BlockingInterface.class.getName(), serverName.getHostAndPort()); this.connectionLock.putIfAbsent(key, key); AdminService.BlockingInterface stub = null; synchronized (this.connectionLock.get(key)) { stub = (AdminService.BlockingInterface) this.stubs.get(key); if (stub == null) { BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(serverName, user, rpcTimeout); stub = AdminService.newBlockingStub(channel); this.stubs.put(key, stub); } } return stub; }
/** * Sends an OPEN RPC to the specified server to open the specified region. * <p> * Open should not fail but can if server just crashed. * <p> * @param server server to open a region * @param region region to open * @param versionOfOfflineNode that needs to be present in the offline node * when RS tries to change the state from OFFLINE to other states. * @param favoredNodes */ public RegionOpeningState sendRegionOpen(final ServerName server, HRegionInfo region, int versionOfOfflineNode, List<ServerName> favoredNodes) throws IOException { AdminService.BlockingInterface admin = getRsAdmin(server); if (admin == null) { LOG.warn("Attempting to send OPEN RPC to server " + server.toString() + " failed because no RPC connection found to this server"); return RegionOpeningState.FAILED_OPENING; } OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server, region, versionOfOfflineNode, favoredNodes); try { OpenRegionResponse response = admin.openRegion(null, request); return ResponseConverter.getRegionOpeningState(response); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } }
/** * Sends an OPEN RPC to the specified server to open the specified region. * <p> * Open should not fail but can if server just crashed. * <p> * @param server server to open a region * @param regionOpenInfos info of a list of regions to open * @return a list of region opening states */ public List<RegionOpeningState> sendRegionOpen(ServerName server, List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos) throws IOException { AdminService.BlockingInterface admin = getRsAdmin(server); if (admin == null) { LOG.warn("Attempting to send OPEN RPC to server " + server.toString() + " failed because no RPC connection found to this server"); return null; } OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(regionOpenInfos); try { OpenRegionResponse response = admin.openRegion(null, request); return ResponseConverter.getRegionOpeningStateList(response); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } }
/** * Check if a region server is reachable and has the expected start code */ public boolean isServerReachable(ServerName server) { if (server == null) throw new NullPointerException("Passed server is null"); int maximumAttempts = Math.max(1, master.getConfiguration().getInt( "hbase.master.maximum.ping.server.attempts", 10)); for (int i = 0; i < maximumAttempts; i++) { try { AdminService.BlockingInterface admin = getRsAdmin(server); if (admin != null) { ServerInfo info = ProtobufUtil.getServerInfo(admin); return info != null && info.hasServerName() && server.getStartcode() == info.getServerName().getStartCode(); } } catch (IOException ioe) { LOG.debug("Couldn't reach " + server + ", try=" + i + " of " + maximumAttempts, ioe); } } return false; }
/** * Contacts a region server and waits up to hbase.hbck.close.timeout ms * (default 120s) to close the region. This bypasses the active hmaster. */ public static void closeRegionSilentlyAndWait(HBaseAdmin admin, ServerName server, HRegionInfo region) throws IOException, InterruptedException { HConnection connection = admin.getConnection(); AdminService.BlockingInterface rs = connection.getAdmin(server); try { ProtobufUtil.closeRegion(rs, server, region.getRegionName(), false); } catch (IOException e) { LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e); } long timeout = admin.getConfiguration() .getLong("hbase.hbck.close.timeout", 120000); long expiration = timeout + System.currentTimeMillis(); while (System.currentTimeMillis() < expiration) { try { HRegionInfo rsRegion = ProtobufUtil.getRegionInfo(rs, region.getRegionName()); if (rsRegion == null) return; } catch (IOException ioe) { return; } Thread.sleep(1000); } throw new IOException("Region " + region + " failed to close within" + " timeout " + timeout); }
private void replayToServer(HRegionInfo regionInfo, List<HLog.Entry> entries) throws IOException, ServiceException { if (entries.isEmpty()) return; HLog.Entry[] entriesArray = new HLog.Entry[entries.size()]; entriesArray = entries.toArray(entriesArray); AdminService.BlockingInterface remoteSvr = conn.getAdmin(getLocation().getServerName()); Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray); try { PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond()); remoteSvr.replay(controller, p.getFirst()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } }
@Test public void testReportBadSink() { ServerName serverNameA = mock(ServerName.class); ServerName serverNameB = mock(ServerName.class); when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)).thenReturn( Lists.newArrayList(serverNameA, serverNameB)); sinkManager.chooseSinks(); // Sanity check assertEquals(1, sinkManager.getSinks().size()); SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class)); sinkManager.reportBadSink(sinkPeer); // Just reporting a bad sink once shouldn't have an effect assertEquals(1, sinkManager.getSinks().size()); }
/** * For expert-admins. Runs close on the regionserver. Closes a region based on * the encoded region name. The region server name is mandatory. If the * servername is provided then based on the online regions in the specified * regionserver the specified region will be closed. The master will not be * informed of the close. Note that the regionname is the encoded regionname. * * @param encodedRegionName * The encoded region name; i.e. the hash that makes up the region * name suffix: e.g. if regionname is * <code>TestTable,0094429456,1289497600452.527db22f95c8a9e0116f0cc13c680396.</code> * , then the encoded region name is: * <code>527db22f95c8a9e0116f0cc13c680396</code>. * @param serverName * The servername of the regionserver. A server name is made of host, * port and startcode. This is mandatory. Here is an example: * <code> host187.example.com,60020,1289493121758</code> * @return true if the region was closed, false if not. * @throws IOException * if a remote or network exception occurs */ public boolean closeRegionWithEncodedRegionName(final String encodedRegionName, final String serverName) throws IOException { if (null == serverName || ("").equals(serverName.trim())) { throw new IllegalArgumentException( "The servername cannot be null or empty."); } ServerName sn = ServerName.valueOf(serverName); AdminService.BlockingInterface admin = this.connection.getAdmin(sn); // Close the region without updating zk state. CloseRegionRequest request = RequestConverter.buildCloseRegionRequest(sn, encodedRegionName, false); try { CloseRegionResponse response = admin.closeRegion(null, request); boolean isRegionClosed = response.getClosed(); if (false == isRegionClosed) { LOG.error("Not able to close the region " + encodedRegionName + "."); } return isRegionClosed; } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } }
/** * Roll the log writer. That is, start writing log messages to a new file. * * @param serverName * The servername of the regionserver. A server name is made of host, * port and startcode. This is mandatory. Here is an example: * <code> host187.example.com,60020,1289493121758</code> * @return If lots of logs, flush the returned regions so next time through * we can clean logs. Returns null if nothing to flush. Names are actual * region names as returned by {@link HRegionInfo#getEncodedName()} * @throws IOException if a remote or network exception occurs * @throws FailedLogCloseException */ public synchronized byte[][] rollHLogWriter(String serverName) throws IOException, FailedLogCloseException { ServerName sn = ServerName.valueOf(serverName); AdminService.BlockingInterface admin = this.connection.getAdmin(sn); RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest(); try { RollWALWriterResponse response = admin.rollWALWriter(null, request); int regionCount = response.getRegionToFlushCount(); byte[][] regionsToFlush = new byte[regionCount][]; for (int i = 0; i < regionCount; i++) { ByteString region = response.getRegionToFlush(i); regionsToFlush[i] = region.toByteArray(); } return regionsToFlush; } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } }
@Override // Nothing is done w/ the 'master' parameter. It is ignored. public AdminService.BlockingInterface getAdmin(final ServerName serverName, final boolean master) throws IOException { if (isDeadServer(serverName)) { throw new RegionServerStoppedException(serverName + " is dead."); } String key = getStubKey(AdminService.BlockingInterface.class.getName(), serverName.getHostAndPort()); this.connectionLock.putIfAbsent(key, key); AdminService.BlockingInterface stub = null; synchronized (this.connectionLock.get(key)) { stub = (AdminService.BlockingInterface)this.stubs.get(key); if (stub == null) { BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(serverName, user, this.rpcTimeout); stub = AdminService.newBlockingStub(channel); this.stubs.put(key, stub); } } return stub; }
/** * @param sn * @return Admin interface for the remote regionserver named <code>sn</code> * @throws IOException * @throws RetriesExhaustedException wrapping a ConnectException if failed */ @SuppressWarnings("deprecation") private AdminService.BlockingInterface getRsAdmin(final ServerName sn) throws IOException { AdminService.BlockingInterface admin = this.rsAdmins.get(sn); if (admin == null) { LOG.debug("New admin connection to " + sn.toString()); if (sn.equals(master.getServerName()) && master instanceof HRegionServer) { // A master is also a region server now, see HBASE-10569 for details admin = ((HRegionServer)master).getRSRpcServices(); } else { admin = this.connection.getAdmin(sn); } this.rsAdmins.put(sn, admin); } return admin; }
private void replayToServer(HRegionInfo regionInfo, List<HLog.Entry> entries) throws IOException, ServiceException { if (entries.isEmpty()) return; HLog.Entry[] entriesArray = new HLog.Entry[entries.size()]; entriesArray = entries.toArray(entriesArray); AdminService.BlockingInterface remoteSvr = conn.getAdmin(getLocation().getServerName()); Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray); PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond()); try { remoteSvr.replay(controller, p.getFirst()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } }