/* * Vorlesung "Vernetzte Systeme" WS 1999/2000, Prof. Dr. F. Mattern * ---------------------------------------------------------------- * PipeLineThread.java: * simulate lost/corrupted and delayed packages in one direction (either * up- or downstream) * * Required Classes: * - PipeLineSendThread: * delay sending data packet for fixed amount of time * - SyncPrintWriter: * Allows multiple threads to share a single output stream. */ import java.io.*; import java.net.*; public class PipeLineThread extends Thread { private String name; private BufferedReader in; private SyncPrintWriter out; private Socket socket; private int minDelay, maxDelay; private double rate; private boolean terminated = false; private boolean blocks = false; private boolean debug = false; private void msg (String s) { if (debug) System.err.println(s); } private void cmsg (String s) { if (debug) System.err.print(s); } private boolean lost (int bytes) { // figure out probablility that a packet gets lost. double badLevel = (Math.pow(10,-rate))*8*bytes; // msg ("Rate: "+ 8*bytes +" bits * " + Math.pow(10,-rate) + // "= "+ badLevel); return (Math.random() < badLevel); } private int totalDelay () { // figure out delay (in ms) return minDelay + (int)Math.round(Math.random()*(maxDelay-minDelay)); } public void terminate() { terminated = true; } private void handlePacket(String inputLine, SyncPrintWriter out) { /* does this packet get lost? */ cmsg (name + ": rcvd "+ inputLine.length() + " bytes."); if (!lost(inputLine.length())) { /* figure out delay */ int wait = totalDelay(); msg (" Delayed ("+ wait +" ms)."); /* start new thread to send out our data with delay */ PipeLineSendThread plts = new PipeLineSendThread(inputLine, out, wait); plts.start(); /* on SunOs we need to let the started thread get some cycles */ yield(); } else { msg (" Lost!"); } } public PipeLineThread (String name, BufferedReader in, SyncPrintWriter out, double rate, int minDelay, int maxDelay, boolean blocks, boolean debug) { super(name); /* call Thread(str) constructor */ this.name = name; this.in = in; this.out = out; /* how do I do bounds checking here?! 0 <= l <= 1!! */ this.rate = rate; this.minDelay = minDelay; /* 0 <= d */ this.maxDelay = maxDelay; /* 0 <= d? */ this.blocks = blocks; this.debug = debug; } /* Start listening on in and prepare delayed sendout to out */ public void run() { try { String inputLine; if (blocks) { while ((inputLine = in.readLine()) != null) { handlePacket(inputLine, out); // just in case any of our threats needs some cycles (SunOS) yield(); } } else { while (!terminated) { if (in.ready()) { if ((inputLine = in.readLine()) == null) { cmsg (name +": got [null]. Terminating..."); terminated = true; continue; } /* else */ handlePacket(inputLine, out); // just in case any of our threats needs some cycles (SunOS) yield(); } } } /* of if (blocks) */ msg (name + ": Input stream ended."); } catch (IOException e) { /* someone closed our socket or stream? Just end execution */ msg (name + ": Input stream closed."); // e.printStackTrace(); } } }