events - Java networking: evented Socket/InputStream -
i'm implementing event-oriented layer on java's sockets, , wondering if there way determine if there data pending read.
my normal approach read socket buffer, , call provided callbacks when buffer filled on given amount of bytes (which 0, if callback needs fired every time arrives), suspect java doing buffering me.
is available()
method of inputstream reliable this? should read()
, own buffering on top of socket? or there way?
shortly put, no. available()
not reliable (at least not me). recommend using java.nio.channels.socketchannel
connected selector
, selectionkey
. solution event-based, more complicated plain sockets.
for clients:
- construct socket channel (
socket
), open selector (selector = selector.open();
). - use non-blocking
socket.configureblocking(false);
- register selector connections
socket.register(selector, selectionkey.op_connect);
- connect
socket.connect(new inetsocketaddress(host, port));
- see if there new
selector.select();
- if "new" refers successful connection, register selector
op_read
; if "new" refers data available, read socket.
however, in order have asynchronous need set separate thread (despite socket being created non-blocked, thread block anyway) checks whether has arrived or not.
for servers, there serversocketchannel
, use op_accept
it.
for reference, code (client), should give hint:
private thread readingthread = new listeningthread(); /** * listening thread - reads messages in separate thread application not blocked. */ private class listeningthread extends thread { public void run() { running = true; try { while(!close) listen(); messenger.close(); } catch(connectexception ce) { donotifyconnectionfailed(ce); } catch(exception e) { // e.printstacktrace(); messenger.close(); } running = false; } } /** * connects host , port. * @param host host connect to. * @param port port of host machine connect to. */ public void connect(string host, int port) { try { socketchannel socket = socketchannel.open(); socket.configureblocking(false); socket.register(this.selector, selectionkey.op_connect); socket.connect(new inetsocketaddress(host, port)); } catch(ioexception e) { this.donotifyconnectionfailed(e); } } /** * waits event happen, processes , returns. * @throws ioexception when goes wrong. */ protected void listen() throws ioexception { // see if there new things going on this.selector.select(); // process events iterator<selectionkey> iter = selector.selectedkeys().iterator(); while(iter.hasnext()) { selectionkey key = iter.next(); iter.remove(); // check validity if(key.isvalid()) { // if connectable... if(key.isconnectable()) { // ...establish connection, make messenger, , notify socketchannel client = (socketchannel)key.channel(); // tricky, registering op_read earlier causes selector not wait incoming bytes, results in 100% cpu usage very, fast if(client!=null && client.finishconnect()) { client.register(this.selector, selectionkey.op_read); } } // if readable, tell messenger read bytes else if(key.isreadable() && (socketchannel)key.channel()==this.messenger.getsocket()) { // read message here } } } } /** * starts client. */ public void start() { // start reading thread if(!this.running) { this.readingthread = new listeningthread(); this.readingthread.start(); } } /** * tells client close @ nearest possible moment. */ public void close() { this.close = true; }
and server:
/** * constructs server. * @param port port listen to. * @param protocol protocol of messages. * @throws ioexception when goes wrong. */ public channelmessageserver(int port) throws ioexception { this.server = serversocketchannel.open(); this.server.configureblocking(false); this.server.socket().bind(new inetsocketaddress(port)); this.server.register(this.selector, selectionkey.op_accept); } /** * waits event, exits. * @throws ioexception when goes wrong. */ protected void listen() throws ioexception { // see if there new things going on this.selector.select(); // process events iterator<selectionkey> iter = selector.selectedkeys().iterator(); while(iter.hasnext()) { selectionkey key = iter.next(); // connected socket iter.remove(); if(key.isvalid()) this.process(key); } } /** * processes selection key. * @param key selectionkey. * @throws ioexception when wrong. */ protected void process(selectionkey key) throws ioexception { // if incoming connection if(key.isacceptable()) { // client socketchannel client = (((serversocketchannel)key.channel()).accept()); try { client.configureblocking(false); client.register(this.selector, selectionkey.op_read); } catch(exception e) { // catch } } // if readable, tell messenger read else if(key.isreadable()) { // read } }
hope helps.
Comments
Post a Comment