events - Java networking: evented Socket/InputStream -


i'm implementing event-oriented layer on java's sockets, , wondering if there way determine if there data pending read.

my normal approach read socket buffer, , call provided callbacks when buffer filled on given amount of bytes (which 0, if callback needs fired every time arrives), suspect java doing buffering me.

is available() method of inputstream reliable this? should read() , own buffering on top of socket? or there way?

shortly put, no. available() not reliable (at least not me). recommend using java.nio.channels.socketchannel connected selector , selectionkey. solution event-based, more complicated plain sockets.

for clients:

  1. construct socket channel (socket), open selector (selector = selector.open();).
  2. use non-blocking socket.configureblocking(false);
  3. register selector connections socket.register(selector, selectionkey.op_connect);
  4. connect socket.connect(new inetsocketaddress(host, port));
  5. see if there new selector.select();
  6. if "new" refers successful connection, register selector op_read; if "new" refers data available, read socket.

however, in order have asynchronous need set separate thread (despite socket being created non-blocked, thread block anyway) checks whether has arrived or not.

for servers, there serversocketchannel , use op_accept it.

for reference, code (client), should give hint:

 private thread readingthread = new listeningthread();   /**   * listening thread - reads messages in separate thread application not blocked.   */  private class listeningthread extends thread {   public void run() {    running = true;    try {     while(!close) listen();     messenger.close();    }    catch(connectexception ce) {     donotifyconnectionfailed(ce);    }    catch(exception e) { //    e.printstacktrace();     messenger.close();    }    running = false;   }  }   /**   * connects host , port.   * @param host host connect to.   * @param port port of host machine connect to.   */  public void connect(string host, int port) {   try {    socketchannel socket = socketchannel.open();    socket.configureblocking(false);    socket.register(this.selector, selectionkey.op_connect);    socket.connect(new inetsocketaddress(host, port));   }   catch(ioexception e) {    this.donotifyconnectionfailed(e);   }  }   /**   * waits event happen, processes , returns.   * @throws ioexception when goes wrong.   */  protected void listen() throws ioexception {   // see if there new things going on   this.selector.select();   // process events   iterator<selectionkey> iter = selector.selectedkeys().iterator();   while(iter.hasnext()) {    selectionkey key = iter.next();    iter.remove();    // check validity    if(key.isvalid()) {     // if connectable...     if(key.isconnectable()) {      // ...establish connection, make messenger, , notify      socketchannel client = (socketchannel)key.channel();      // tricky, registering op_read earlier causes selector not wait incoming bytes, results in 100% cpu usage very, fast      if(client!=null && client.finishconnect()) {       client.register(this.selector, selectionkey.op_read);      }     }     // if readable, tell messenger read bytes     else if(key.isreadable() && (socketchannel)key.channel()==this.messenger.getsocket()) {      // read message here     }    }   }  }   /**   * starts client.   */  public void start() {   // start reading thread   if(!this.running) {    this.readingthread = new listeningthread();    this.readingthread.start();   }  }   /**   * tells client close @ nearest possible moment.   */  public void close() {   this.close = true;  } 

and server:

 /**   * constructs server.   * @param port port listen to.   * @param protocol protocol of messages.   * @throws ioexception when goes wrong.   */  public channelmessageserver(int port) throws ioexception {   this.server = serversocketchannel.open();   this.server.configureblocking(false);   this.server.socket().bind(new inetsocketaddress(port));   this.server.register(this.selector, selectionkey.op_accept);  }   /**   * waits event, exits.   * @throws ioexception when goes wrong.   */  protected void listen() throws ioexception {   // see if there new things going on   this.selector.select();   // process events   iterator<selectionkey> iter = selector.selectedkeys().iterator();   while(iter.hasnext()) {    selectionkey key = iter.next();    // connected socket    iter.remove();    if(key.isvalid()) this.process(key);   }  }   /**   * processes selection key.   * @param key selectionkey.   * @throws ioexception when wrong.   */  protected void process(selectionkey key) throws ioexception {   // if incoming connection   if(key.isacceptable()) {    // client    socketchannel client = (((serversocketchannel)key.channel()).accept());     try {      client.configureblocking(false);      client.register(this.selector, selectionkey.op_read);     }     catch(exception e) {      // catch     }   }   // if readable, tell messenger read   else if(key.isreadable()) {   // read   }  } 

hope helps.


Comments

Popular posts from this blog

c# - how to write client side events functions for the combobox items -

exception - Python, pyPdf OCR error: pyPdf.utils.PdfReadError: EOF marker not found -