private void cancelKey(final SelectionKey key) { if ( log.isTraceEnabled() ) log.trace("Adding key for cancel event:"+key); ObjectReader reader = (ObjectReader)key.attachment(); if ( reader != null ) { reader.setCancelled(true); reader.finish(); } Runnable cx = new Runnable() { @Override public void run() { if ( log.isTraceEnabled() ) log.trace("Cancelling key:"+key); NioReceiver.cancelledKey(key); } }; receiver.addEvent(cx); }
private void cancelKey(final SelectionKey key) { if (log.isTraceEnabled()) log.trace("Adding key for cancel event:" + key); ObjectReader reader = (ObjectReader) key.attachment(); if (reader != null) { reader.setCancelled(true); reader.finish(); } Runnable cx = new Runnable() { @Override public void run() { if (log.isTraceEnabled()) log.trace("Cancelling key:" + key); NioReceiver.cancelledKey(key); } }; receiver.addEvent(cx); }
protected void registerForRead(final SelectionKey key, ObjectReader reader) { if ( log.isTraceEnabled() ) log.trace("Adding key for read event:"+key); reader.finish(); //register our OP_READ interest Runnable r = new Runnable() { @Override public void run() { try { if (key.isValid()) { // cycle the selector so this key is active again key.selector().wakeup(); // resume interest in OP_READ, OP_WRITE int resumeOps = key.interestOps() | SelectionKey.OP_READ; key.interestOps(resumeOps); if ( log.isTraceEnabled() ) log.trace("Registering key for read:"+key); } } catch (CancelledKeyException ckx ) { NioReceiver.cancelledKey(key); if ( log.isTraceEnabled() ) log.trace("CKX Cancelling key:"+key); } catch (Exception x) { log.error("Error registering key for read:"+key,x); } } }; receiver.addEvent(r); }
protected void socketTimeouts() { long now = System.currentTimeMillis(); if ( (now-lastCheck) < getSelectorTimeout() ) return; //timeout Selector tmpsel = this.selector.get(); Set<SelectionKey> keys = (isListening()&&tmpsel!=null)?tmpsel.keys():null; if ( keys == null ) return; for (Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext();) { SelectionKey key = iter.next(); try { // if (key.interestOps() == SelectionKey.OP_READ) { // //only timeout sockets that we are waiting for a read from // ObjectReader ka = (ObjectReader) key.attachment(); // long delta = now - ka.getLastAccess(); // if (delta > (long) getTimeout()) { // cancelledKey(key); // } // } // else if ( key.interestOps() == 0 ) { //check for keys that didn't make it in. ObjectReader ka = (ObjectReader) key.attachment(); if ( ka != null ) { long delta = now - ka.getLastAccess(); if (delta > getTimeout() && (!ka.isAccessed())) { if (log.isWarnEnabled()) log.warn("Channel key is registered, but has had no interest ops for the last "+getTimeout()+" ms. (cancelled:"+ka.isCancelled()+"):"+key+" last access:"+new java.sql.Timestamp(ka.getLastAccess())+" Possible cause: all threads used, perform thread dump"); ka.setLastAccess(now); //key.interestOps(SelectionKey.OP_READ); }//end if } else { cancelledKey(key); }//end if }//end if }catch ( CancelledKeyException ckx ) { cancelledKey(key); } } lastCheck = System.currentTimeMillis(); }
public void listen() throws Exception { if (doListen()) { log.warn("ServerSocket already started"); return; } setListen(true); while ( doListen() ) { Socket socket = null; if ( getTaskPool().available() < 1 ) { if ( log.isWarnEnabled() ) log.warn("All BIO server replication threads are busy, unable to handle more requests until a thread is freed up."); } BioReplicationTask task = (BioReplicationTask)getTaskPool().getRxTask(); if ( task == null ) continue; //should never happen try { socket = serverSocket.accept(); }catch ( Exception x ) { if ( doListen() ) throw x; } if ( !doListen() ) { task.setDoRun(false); task.serviceSocket(null,null); getExecutor().execute(task); break; //regular shutdown } if ( socket == null ) continue; socket.setReceiveBufferSize(getRxBufSize()); socket.setSendBufferSize(getTxBufSize()); socket.setTcpNoDelay(getTcpNoDelay()); socket.setKeepAlive(getSoKeepAlive()); socket.setOOBInline(getOoBInline()); socket.setReuseAddress(getSoReuseAddress()); socket.setSoLinger(getSoLingerOn(),getSoLingerTime()); socket.setSoTimeout(getTimeout()); ObjectReader reader = new ObjectReader(socket); task.serviceSocket(socket,reader); getExecutor().execute(task); }//while }
protected void execute(ObjectReader reader) throws Exception{ int pkgcnt = reader.count(); if ( pkgcnt > 0 ) { ChannelMessage[] msgs = reader.execute(); for ( int i=0; i<msgs.length; i++ ) { /** * Use send ack here if you want to ack the request to the remote * server before completing the request * This is considered an asynchronized request */ if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND); try { //process the message getCallback().messageDataReceived(msgs[i]); /** * Use send ack here if you want the request to complete on this * server before sending the ack to the remote server * This is considered a synchronized request */ if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND); }catch ( Exception x ) { if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.FAIL_ACK_COMMAND); log.error("Error thrown from messageDataReceived.",x); } if ( getUseBufferPool() ) { BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage()); msgs[i].setMessage(null); } } } }
/** * Called to initiate a unit of work by this worker thread on the provided * SelectionKey object. This method is synchronized, as is the run() method, * so only one key can be serviced at a given time. Before waking the worker * thread, and before returning to the main selection loop, this key's * interest set is updated to remove OP_READ. This will cause the selector * to ignore read-readiness for this channel while the worker thread is * servicing it. */ public synchronized void serviceChannel(SelectionKey key) { if (log.isTraceEnabled()) log.trace("About to service key:" + key); ObjectReader reader = (ObjectReader) key.attachment(); if (reader != null) reader.setLastAccess(System.currentTimeMillis()); this.key = key; key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); }
protected void registerForRead(final SelectionKey key, ObjectReader reader) { if (log.isTraceEnabled()) log.trace("Adding key for read event:" + key); reader.finish(); // register our OP_READ interest Runnable r = new Runnable() { @Override public void run() { try { if (key.isValid()) { // cycle the selector so this key is active again key.selector().wakeup(); // resume interest in OP_READ, OP_WRITE int resumeOps = key.interestOps() | SelectionKey.OP_READ; key.interestOps(resumeOps); if (log.isTraceEnabled()) log.trace("Registering key for read:" + key); } } catch (CancelledKeyException ckx) { NioReceiver.cancelledKey(key); if (log.isTraceEnabled()) log.trace("CKX Cancelling key:" + key); } catch (Exception x) { log.error("Error registering key for read:" + key, x); } } }; receiver.addEvent(r); }
protected void execute(ObjectReader reader) throws Exception { int pkgcnt = reader.count(); if (pkgcnt > 0) { ChannelMessage[] msgs = reader.execute(); for (int i = 0; i < msgs.length; i++) { /** * Use send ack here if you want to ack the request to the * remote server before completing the request This is * considered an asynchronous request */ if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND); try { // process the message getCallback().messageDataReceived(msgs[i]); /** * Use send ack here if you want the request to complete on * this server before sending the ack to the remote server * This is considered a synchronized request */ if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.ACK_COMMAND); } catch (Exception x) { if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(Constants.FAIL_ACK_COMMAND); log.error("Error thrown from messageDataReceived.", x); } if (getUseBufferPool()) { BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage()); msgs[i].setMessage(null); } } } }
protected void socketTimeouts() { long now = System.currentTimeMillis(); if ( (now-lastCheck) < getSelectorTimeout() ) return; //timeout Selector tmpsel = selector; Set<SelectionKey> keys = (isListening()&&tmpsel!=null)?tmpsel.keys():null; if ( keys == null ) return; for (Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext();) { SelectionKey key = iter.next(); try { // if (key.interestOps() == SelectionKey.OP_READ) { // //only timeout sockets that we are waiting for a read from // ObjectReader ka = (ObjectReader) key.attachment(); // long delta = now - ka.getLastAccess(); // if (delta > (long) getTimeout()) { // cancelledKey(key); // } // } // else if ( key.interestOps() == 0 ) { //check for keys that didn't make it in. ObjectReader ka = (ObjectReader) key.attachment(); if ( ka != null ) { long delta = now - ka.getLastAccess(); if (delta > getTimeout() && (!ka.isAccessed())) { if (log.isWarnEnabled()) log.warn("Channel key is registered, but has had no interest ops for the last "+getTimeout()+" ms. (cancelled:"+ka.isCancelled()+"):"+key+" last access:"+new java.sql.Timestamp(ka.getLastAccess())+" Possible cause: all threads used, perform thread dump"); ka.setLastAccess(now); //key.interestOps(SelectionKey.OP_READ); }//end if } else { cancelledKey(key); }//end if }//end if }catch ( CancelledKeyException ckx ) { cancelledKey(key); } } lastCheck = System.currentTimeMillis(); }
@Override public synchronized void run() { if ( buffer == null ) { int size = getRxBufSize(); if (key.channel() instanceof DatagramChannel) { size = ChannelReceiver.MAX_UDP_SIZE; } if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER) { buffer = ByteBuffer.allocateDirect(size); } else { buffer = ByteBuffer.allocate(size); } } else { buffer.clear(); } if (key == null) { return; // just in case } if ( log.isTraceEnabled() ) log.trace("Servicing key:"+key); try { ObjectReader reader = (ObjectReader)key.attachment(); if ( reader == null ) { if ( log.isTraceEnabled() ) log.trace("No object reader, cancelling:"+key); cancelKey(key); } else { if ( log.isTraceEnabled() ) log.trace("Draining channel:"+key); drainChannel(key, reader); } } catch (Exception e) { //this is common, since the sockets on the other //end expire after a certain time. if ( e instanceof CancelledKeyException ) { //do nothing } else if ( e instanceof IOException ) { //dont spew out stack traces for IO exceptions unless debug is enabled. if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].", e); else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"]."); } else if ( log.isErrorEnabled() ) { //this is a real error, log it. log.error("Exception caught in TcpReplicationThread.drainChannel.",e); } cancelKey(key); } finally { } key = null; // done, ready for more, return to pool getTaskPool().returnWorker (this); }
public synchronized void serviceSocket(Socket socket, ObjectReader reader) { this.socket = socket; this.reader = reader; }
@Override public synchronized void run() { if (buffer == null) { int size = getRxBufSize(); if (key.channel() instanceof DatagramChannel) { size = ChannelReceiver.MAX_UDP_SIZE; } if ((getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER) { buffer = ByteBuffer.allocateDirect(size); } else { buffer = ByteBuffer.allocate(size); } } else { buffer.clear(); } if (key == null) { return; // just in case } if (log.isTraceEnabled()) log.trace("Servicing key:" + key); try { ObjectReader reader = (ObjectReader) key.attachment(); if (reader == null) { if (log.isTraceEnabled()) log.trace("No object reader, cancelling:" + key); cancelKey(key); } else { if (log.isTraceEnabled()) log.trace("Draining channel:" + key); drainChannel(key, reader); } } catch (Exception e) { // this is common, since the sockets on the other // end expire after a certain time. if (e instanceof CancelledKeyException) { // do nothing } else if (e instanceof IOException) { // dont spew out stack traces for IO exceptions unless debug is // enabled. if (log.isDebugEnabled()) log.debug( "IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed[" + e.getMessage() + "].", e); else log.warn( "IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed[" + e.getMessage() + "]."); } else if (log.isErrorEnabled()) { // this is a real error, log it. log.error("Exception caught in TcpReplicationThread.drainChannel.", e); } cancelKey(key); } finally { } key = null; // done, ready for more, return to pool getTaskPool().returnWorker(this); }
protected void socketTimeouts() { long now = System.currentTimeMillis(); if ((now - lastCheck) < getSelectorTimeout()) return; // timeout Selector tmpsel = this.selector.get(); Set<SelectionKey> keys = (isListening() && tmpsel != null) ? tmpsel.keys() : null; if (keys == null) return; for (Iterator<SelectionKey> iter = keys.iterator(); iter.hasNext();) { SelectionKey key = iter.next(); try { // if (key.interestOps() == SelectionKey.OP_READ) { // //only timeout sockets that we are waiting for a read from // ObjectReader ka = (ObjectReader) key.attachment(); // long delta = now - ka.getLastAccess(); // if (delta > (long) getTimeout()) { // cancelledKey(key); // } // } // else if (key.interestOps() == 0) { // check for keys that didn't make it in. ObjectReader ka = (ObjectReader) key.attachment(); if (ka != null) { long delta = now - ka.getLastAccess(); if (delta > getTimeout() && (!ka.isAccessed())) { if (log.isWarnEnabled()) log.warn("Channel key is registered, but has had no interest ops for the last " + getTimeout() + " ms. (cancelled:" + ka.isCancelled() + "):" + key + " last access:" + new java.sql.Timestamp(ka.getLastAccess()) + " Possible cause: all threads used, perform thread dump"); ka.setLastAccess(now); // key.interestOps(SelectionKey.OP_READ); } // end if } else { cancelledKey(key); } // end if } // end if } catch (CancelledKeyException ckx) { cancelledKey(key); } } lastCheck = System.currentTimeMillis(); }
public void listen() throws Exception { if (doListen()) { log.warn("ServerSocket already started"); return; } setListen(true); while (doListen()) { Socket socket = null; if (getTaskPool().available() < 1) { if (log.isWarnEnabled()) log.warn( "All BIO server replication threads are busy, unable to handle more requests until a thread is freed up."); } BioReplicationTask task = (BioReplicationTask) getTaskPool().getRxTask(); if (task == null) continue; // should never happen try { socket = serverSocket.accept(); } catch (Exception x) { if (doListen()) throw x; } if (!doListen()) { task.setDoRun(false); task.serviceSocket(null, null); getExecutor().execute(task); break; // regular shutdown } if (socket == null) continue; socket.setReceiveBufferSize(getRxBufSize()); socket.setSendBufferSize(getTxBufSize()); socket.setTcpNoDelay(getTcpNoDelay()); socket.setKeepAlive(getSoKeepAlive()); socket.setOOBInline(getOoBInline()); socket.setReuseAddress(getSoReuseAddress()); socket.setSoLinger(getSoLingerOn(), getSoLingerTime()); socket.setSoTimeout(getTimeout()); ObjectReader reader = new ObjectReader(socket); task.serviceSocket(socket, reader); getExecutor().execute(task); } // while }