package edu.vt.marian.uip; import java.io.*; import java.net.*; import java.util.*; import edu.vt.marian.common.*; /** class name: server_uip_receiver class description: this class is responsible to communicate with one client_uip, send rpc_function to and receive rpc_function from it through sockets
uses the services of class(es): designer(s): Jianxin Zhao (jxzhao@csgrad.cs.vt.edu) implementator(s): finished time: known bugs: JDK version: 1.1.5 side effects: */ public class server_uip_receiver extends Thread { /** this is just used for debugging */ Debug debug; /** this id will identify this receiver from the receiver table */ private int client_id; /** this is a brief description of the client_uip this receiver responds to */ private String client_desc; /** this is the version of this receiver, it will affect the way rpc_functions are written to and read from stream */ private int version; /** this indicate the current status of this receiver, usually a receiver will be deleted if it's status is not ok for some time */ private int status; /** those are possible status */ public final static int OK = 0; public final static int INVALID_PARAMETER = 1; public final static int NEW_SESSION_ERROR = 2; public final static int ERROR_CONNECT_TO_CLIENT = 3; public final static int CLIENT_DISCONNECTED = 4; public final static int READ_ACK_ERROR = 5; public final static int SEND_FUNCTION_ERROR = 6; public final static int ACK_TIMEOUT = 7; public final static int WRITE_ERROR = 8; public final static int EXIT = 9; public final static int ERROR_CREATE_STREAM = 10; public final static int ERROR_SET_SOCKET_TIMEOUT = 11; public final static int READ_FUNCTION_ERROR = 12; /** those are possible method return values */ public final static int NAK = 1; /** this is the receiver table this receiver belongs to */ private server_uip_receiver_table surt; /** this socket is used to read functions sent from client uip */ private Socket inbound_socket; /** this socket is used to pass functions to client uip */ private Socket outbound_socket; /** those are the input/output streams of the above two sockets, put them here may save some time since we only need to generate them once in constructor\ */ private BufferedInputStream inbound_bis, outbound_bis; private BufferedOutputStream inbound_bos, outbound_bos; /** method description: this constructor will create a server_uip_receiver object based on the information in the parameters.
uses the services of class(es): input parameter(s): surt -- the server_uip_receiver_table which created this object client_id -- used to identify this object in the server_uip_receiver _table s -- socket used to communicate with the client_uip version -- specifies how to pass and receive rpc_functions through sockets debug -- used for debugging output parameter(s): none return value: none synchronization consideration: none */ public server_uip_receiver(server_uip_receiver_table surt, int client_id, Socket s, int version, Debug debug) { this.debug = debug; if (surt == null) { debug.dumpTrace("class server_uip_receiver, constructor, parameters server_uip_receiver_table or socket is null"); status = INVALID_PARAMETER; return; } this.surt = surt; this.client_id = client_id; inbound_socket = s; this.version = version; try { inbound_bis = new BufferedInputStream(inbound_socket.getInputStream()); inbound_bos = new BufferedOutputStream(inbound_socket.getOutputStream()); } catch (Exception e) { debug.dumpTrace("class server_uip_receiver, constructor, get streams from inbound socket failed"); status = ERROR_CREATE_STREAM; return; } // the client uip have to response within 60 seconds try { inbound_socket.setSoTimeout(60000); } catch (Exception e) { debug.dumpTrace("class server_uip_receiver, constructor, error set inbound socket time out value"); status = ERROR_SET_SOCKET_TIMEOUT; return; } rpc_function function = new rpc_function(inbound_bis, version, debug); // after the first function, no time requirement for other functions try { inbound_socket.setSoTimeout(0); } catch (Exception e) { debug.dumpTrace("class server_uip_receiver, constructor, error set inbound socket time out value"); status = ERROR_SET_SOCKET_TIMEOUT; return; } if ((! function.is_valid()) || (function.get_name() == null) || (! function.get_name().equals("new_session"))) { // only "new_session" function should be read out here debug.dumpTrace("class server_uip_receiver, constructor, reading function new_session error"); status = NEW_SESSION_ERROR; return; } // we need to send back ACK for this function here byte[] reply = (new String("ACK")).getBytes(); try { inbound_bos.write(reply, 0, 3); inbound_bos.flush(); } catch (Exception e) { debug.dumpTrace("class server_uip_receiver, constructor, error writing ACK"); status = WRITE_ERROR; return; } // get the client port number from the function parameter p = function.get_parameter(1); if ((p == null) || (! p.get_type().equals("INTEGER"))) { debug.dumpTrace("class server_uip_receiver, constructor, can't get port number from new_session"); status = NEW_SESSION_ERROR; return; } int port_number = ((Integer)p.get_value()).intValue(); if ((port_number <= 0) || (port_number >= 65536)) { debug.dumpTrace("class server_uip_receiver, constructor, port number invalid"); status = NEW_SESSION_ERROR; return; } // try to create outbound socket to client uip try { outbound_socket = new Socket(inbound_socket.getInetAddress(), port_number); outbound_bis = new BufferedInputStream( outbound_socket.getInputStream()); outbound_bos = new BufferedOutputStream( outbound_socket.getOutputStream()); } catch (Exception e) { debug.dumpTrace("class server_uip_receiver, constructor, error creating outbound_socket or get streams from it"); status = ERROR_CONNECT_TO_CLIENT; return; } // ACK need to be send back within 60 seconds after a function has been // passed try { outbound_socket.setSoTimeout(60000); } catch (Exception e) { debug.dumpTrace("class server_uip_receiver, constructor, error set outbound socket time out value"); status = ERROR_SET_SOCKET_TIMEOUT; return; } // create the description of the client uip client_desc = (inbound_socket.getInetAddress()).getHostName() + "--" + Integer.toString(port_number); status = OK; // write log -- a new receiver is created surt.log_data_from_server_uip_receiver( client_id, client_desc + " is connected"); // start to read functions from the client uip this.start(); return; } /** method description: this method of the thread will keep on reading rpc_functions from the socket connected to the client_uip.
uses the services of class(es): input parameter(s): none output parameter(s): none return value: none synchronization consideration: none */ public void run() { // implementation required while (true) { rpc_function function = new rpc_function(inbound_bis, version, debug); if (! function.is_valid()) { debug.dumpTrace("server_uip_receiver:run(): error reading function from client_uip."); if (status == OK) { status = READ_FUNCTION_ERROR; } return; } ////DEBUG debug.dumpTrace("server_uip_receiver:run(): received " + function.get_name() + "() function."); ////DEBUG parameter p = function.get_parameter(0); ////DEBUG if (p != null)debug.dumpTrace(" session: " + ((Integer) p.get_value()).toString()); // the function read out is valid, now send back ack or nak if ((version == 1) && (function.get_name() == null)) { // this is an unrecognized function byte[] reply = (new String("NAK")).getBytes(); try { inbound_bos.write(reply, 0, 3); inbound_bos.flush(); } catch (Exception e) { debug.dumpTrace("class server_uip_receiver, method run, error writing NAK"); if (status == OK) { status = WRITE_ERROR; } return; } } else { byte[] reply = (new String("ACK")).getBytes(); try { inbound_bos.write(reply, 0, 3); inbound_bos.flush(); } catch (Exception e) { debug.dumpTrace("class server_uip_receiver, method run, error writing ACK"); if (status == OK) { status = WRITE_ERROR; } return; } } //debug.dumpTrace("class server_uip_receiver, method run, ack send back to client"); if ((function.get_name() != null) && (function.get_name().equals("end_session"))) { // client request to end this connection status = CLIENT_DISCONNECTED; surt.log_data_from_server_uip_receiver( client_id, client_desc + " client disconnected"); return; } // this is a normal function, just pass it to server uip // receiver table to process surt.process_call_back_from_server_uip_receiver( client_id, client_desc, function); } // end -- while } /** method description: this method will pass the rpc_function to the client_uip through socket
uses the services of class(es): input parameter(s): rf -- the function need to be passed to the client_uip output parameter(s): none return value: none synchronization consideration: this is a synchronized method since it will write something to the socket and wait for reply */ public synchronized int rpc_call(rpc_function rf) { // implementation required if (rf.to_stream_in_xdr(outbound_bos, version) != rf.OK) { debug.dumpTrace("server_uip_receiver.rpc_call(): error writing function to client."); if (status == OK) { // we will not change the status if it's already not OK status = SEND_FUNCTION_ERROR; } return SEND_FUNCTION_ERROR; } // the writing is successful, now try to read out acknowdgement byte[] b = new byte[3]; try { int i0, i1, i2; i0 = outbound_bis.read(); i1 = outbound_bis.read(); i2 = outbound_bis.read(); if ((i0 == -1) || (i1 == -1) || (i2 == -1)) { // end of the stream has been reached debug.dumpTrace("server_uip_receiver:rpc_call(): error reading ACK."); if (status == OK) { status = READ_ACK_ERROR; } return READ_ACK_ERROR; } b[0] = (new Integer(i0)).byteValue(); b[1] = (new Integer(i1)).byteValue(); b[2] = (new Integer(i2)).byteValue(); } catch (Exception e) { debug.dumpTrace("server_uip_receiver:rpc_call(): error read ACK from client uip."); if (status == OK) { // we will not change the status if it's already not OK status = READ_ACK_ERROR; } return READ_ACK_ERROR; } String reply = new String(b); if (reply.equals("ACK")) { return OK; } // we didn't get ACK debug.dumpTrace("server_uip_receiver:rpc_call(): NAK received."); return NAK; } /** method description: this method will let the object exit smoothly.
uses the services of class(es): input parameter(s): condition --- under what condition is this method is called output parameter(s): none return value: 0 -- every thing is fine other -- synchronization consideration: none */ public int exit(String condition) { // first stop reading functions send from client uip this.stop(); if (status == OK) { // exit under normal condition, inform the client part rpc_function rf = new rpc_function(debug); rf.set_name("end_session"); rpc_call(rf); } status = EXIT; // close the two sockets anyway try { inbound_socket.close(); outbound_socket.close(); } catch (Exception e) {} // write log surt.log_data_from_server_uip_receiver( client_id, client_desc + ", exit under condition " + condition); return OK; } /** method description: this method will return the current status of this object
uses the services of class(es): input parameter(s): none output parameter(s): none return value: 0 -- every thing is fine other -- synchronization consideration: none */ public int get_status() { return status; } /** method description: this method will return the id of this object
uses the services of class(es): input parameter(s): none output parameter(s): none return value: the id of this object as an integer synchronization consideration: none */ public int get_client_id() { return client_id; } }