import Utilities.*;
import Synchronization.*;

class BoundedBuffer extends MyObject { // designed for a single
                      // producer thread and a single consumer thread
   private int numSlots = 0;
   private double[] buffer = null;
   private int putIn = 0, takeOut = 0;
   private CountingSemaphore elements = null;
   private CountingSemaphore spaces = null;

   public BoundedBuffer(int numSlots) {
      super("BoundedBuffer with " + numSlots + " slots");
      this.numSlots = numSlots;
      buffer = new double[numSlots];
      elements = new CountingSemaphore(0);
      spaces = new CountingSemaphore(numSlots);
   }

   public void deposit(double value) {
      P(spaces);
      buffer[putIn] = value;
      putIn = (putIn + 1) % numSlots;
      V(elements);
   }

   public double fetch() {
      double value;
      P(elements);
      value = buffer[takeOut];
      takeOut = (takeOut + 1) % numSlots;
      V(spaces);
      return value;
   }
}

class A extends MyObject implements Runnable {

   private int limit = 0;
   private BoundedBuffer ABbb = null;

   public A(int limit, BoundedBuffer ABbb) {
      this.limit = limit;
      this.ABbb = ABbb;
   }

   public void run() {
      double work;
      for (int i = 0; i < limit; i++) {
         work = random();
         System.out.println("A: work " + i + " ="  + work);
         work = Math.sqrt(work);      nap(1+(int)random(2000));
         ABbb.deposit(work);
      }
   }
}

class B extends MyObject implements Runnable {

   private int limit = 0;
   private BoundedBuffer ABbb = null;
   private BoundedBuffer BCbb = null;

   public B(int limit, BoundedBuffer ABbb, BoundedBuffer BCbb) {
      this.limit = limit;
      this.ABbb = ABbb;
      this.BCbb = BCbb;
   }

   public void run() {
      double work;
      for (int i = 0; i < limit; i++) {
         work = ABbb.fetch();
         work = Math.sin(work);      nap(1+(int)random(2000));
         System.out.println("      B: work " + i + " ="  + work);
         BCbb.deposit(work);
      }
   }
}

class C extends MyObject implements Runnable {

   private int limit = 0;
   private BoundedBuffer BCbb = null;

   public C(int limit, BoundedBuffer BCbb) {
      this.limit = limit;
      this.BCbb = BCbb;
   }

   public void run() {
      double work;
      for (int i = 0; i < limit; i++) {
         work = BCbb.fetch();
         work = work * work;      nap(1+(int)random(2000));
         System.out.println("            C: work " + i + " ="  + work);
      }
   }
}

class Piping extends MyObject {

   public static void main(String[] args) {

      // create the bounded buffers
      BoundedBuffer ABbb = new BoundedBuffer(4);
      BoundedBuffer BCbb = new BoundedBuffer(4);

      // start threads A, B, and C
      Thread a = new Thread(new A(9, ABbb));
      Thread b = new Thread(new B(9, ABbb, BCbb));
      Thread c = new Thread(new C(9, BCbb));
      a.start(); b.start(); c.start();
      System.out.println("All threads started");

      // wait for them to finish
      try {
         a.join(); b.join(); c.join();
      } catch (InterruptedException e) {}
      System.exit(0);
   }
}

/* ............... Example compile and run(s)

D:\>javac pipe.java

D:\>java Piping
All threads started
A: work 0 =0.525258
A: work 1 =0.98132
      B: work 0 =0.662946
            C: work 0 =0.439497
A: work 2 =0.814105
A: work 3 =0.946678
      B: work 1 =0.836364
            C: work 1 =0.699504
      B: work 2 =0.784741
A: work 4 =0.12548
            C: work 2 =0.615818
A: work 5 =0.293954
      B: work 3 =0.826563
            C: work 3 =0.683207
      B: work 4 =0.346869
A: work 6 =0.1689
A: work 7 =0.46644
            C: work 4 =0.120318
A: work 8 =0.224181
      B: work 5 =0.516001
      B: work 6 =0.399503
            C: work 5 =0.266257
      B: work 7 =0.631095
      B: work 8 =0.455984
            C: work 6 =0.159603
            C: work 7 =0.398281
            C: work 8 =0.207921
                                            ... end of example run(s)  */
