@Override public void cancel() { LOG.info("Request to stop container {}.", container.getId()); StopContainerRequest stopRequest = Records.newRecord(StopContainerRequest.class); stopRequest.setContainerId(container.getId()); try { manager.stopContainer(stopRequest); while (true) { GetContainerStatusRequest statusRequest = Records.newRecord(GetContainerStatusRequest.class); statusRequest.setContainerId(container.getId()); GetContainerStatusResponse statusResponse = manager.getContainerStatus(statusRequest); LOG.trace("Container status: {} {}", statusResponse.getStatus(), statusResponse.getStatus().getDiagnostics()); if (statusResponse.getStatus().getState() == ContainerState.COMPLETE) { break; } Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); } LOG.info("Container {} stopped.", container.getId()); } catch (YarnRemoteException e) { LOG.error("Fail to stop container {}", container.getId(), e); throw Throwables.propagate(e); } }
@Override public void cancel() { LOG.info("Request to stop container {}.", container.getId()); StopContainerRequest stopRequest = Records.newRecord(StopContainerRequest.class); stopRequest.setContainerId(container.getId()); try { manager.stopContainer(stopRequest); boolean completed = false; while (!completed) { GetContainerStatusRequest statusRequest = Records.newRecord(GetContainerStatusRequest.class); statusRequest.setContainerId(container.getId()); GetContainerStatusResponse statusResponse = manager.getContainerStatus(statusRequest); LOG.info("Container status: {} {}", statusResponse.getStatus(), statusResponse.getStatus().getDiagnostics()); completed = (statusResponse.getStatus().getState() == ContainerState.COMPLETE); } LOG.info("Container {} stopped.", container.getId()); } catch (YarnRemoteException e) { LOG.error("Fail to stop container {}", container.getId(), e); throw Throwables.propagate(e); } }
@Override public synchronized void stopContainer() { if(isCompletelyDone()) { return; } if(this.state == ContainerState.PREP) { this.state = ContainerState.KILLED_BEFORE_LAUNCH; } else { LOG.info("KILLING " + containerID); ContainerManager proxy = null; try { proxy = getCMProxy(this.containerID, this.containerMgrAddress, this.containerToken); // kill the remote container if already launched StopContainerRequest stopRequest = Records .newRecord(StopContainerRequest.class); stopRequest.setContainerId(this.containerID); proxy.stopContainer(stopRequest); // If stopContainer returns without an error, assuming the stop made // it over to the NodeManager. // context.getEventHandler().handle( // new AMContainerEvent(containerID, AMContainerEventType.C_NM_STOP_SENT)); context.getResourceAllocator().removeContainer(containerID); } catch (Throwable t) { // ignore the cleanup failure String message = "cleanup failed for container " + this.containerID + " : " + StringUtils.stringifyException(t); // context.getEventHandler().handle( // new AMContainerEventStopFailed(containerID, message)); LOG.warn(message); this.state = ContainerState.DONE; return; } finally { if (proxy != null) { yarnRPC.stopProxy(proxy, conf); } } this.state = ContainerState.DONE; } }