/* A bounded-buffer program for multiple consumers and multiple producers  
 * Created by Henry Walker, 2004
 * Last modified by Janet Davis, 26 September 2006
 */
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>          
#include <stdio.h>
#include <sys/wait.h>
#include <sys/mman.h>
#include <sys/ipc.h>
#include <sys/sem.h>

#define NUM_READERS 5  /* number of consumer processes to be spawned */
#define NUM_WRITERS 6  /* number of producer processes to be spawned */

#define BUF_SIZE 5            /* logical size of buffer */
#define SHARED_MEM_SIZE (BUF_SIZE+2)*sizeof(int) /* size of shared memory */
#define READER_LENGTH 12  /* number of iterations for consumer in test run */
#define WRITER_LENGTH 10  /* number of iterations for producer in test run */

#define BUF_USED 0     /* semaphore array index to check buffer elts used */
#define BUF_SPACE 1    /* semaphore array index to check buffer elts empty */
#define MUTEX 2        /* semaphore index for mutual exclusion to buffer*/
#define NUM_SEMS 3     /* number of semaphores */

/* create the specified number of semaphores */
int sem_create( int num_semaphores ) {
   int semid;

   /* create new semaphore set of semaphores */
   if ( (semid = semget( IPC_PRIVATE, num_semaphores, IPC_CREAT | 0600 ))  
         < 0) {
     perror ("error in creating semaphore");/* 0600 = read/alter by user */
     exit (1);
   }
   return semid;
}

/* initialize specified semaphore to given value */
void sem_init( int semid, int index, int value ) {  
   if ( semctl( semid, index, SETVAL, value ) < 0 ) {
     perror( "error in initializing semaphore" );
     exit (1);
   }
}

/* perform a wait (down, P) operation on a semaphore of given id and index
 */
void sem_wait( int semid, int index ) {

  struct sembuf sops[1];  /* only one semaphore operation to be executed */
   
  sops[0].sem_num = index;/* define operation on semaphore with given index */
  sops[0].sem_op  = -1;   /* subtract 1 to value for P operation */
  sops[0].sem_flg = 0;    /* type "man semop" in shell window for details */

  if ( semop( semid, sops, 1 ) < 0 ) {
    perror ("error in semaphore operation");
    exit (1);
  }
}

/* perform a signal (up, V) operation on semaphore of given id and index 
 */
void sem_signal( int semid, int index ) {
   struct sembuf sops[1];  /* only one semaphore operation to be executed */

   sops[0].sem_num = index;/* define operation on semaphore with given index */
   sops[0].sem_op  = 1;    /* add 1 to value for V operation */
   sops[0].sem_flg = 0;    /* type "man semop" in shell window for details */

   if ( semop( semid, sops, 1 ) < 0) {
     perror( "error in semaphore operation" );
     exit( 1 );
   }
}


/* Write values into shared memory buffer */
void producer( int my_id, int* buffer, int* in, int* out, int semid ) {
  int i, value;
  printf( "The producer process %d begins.\n", my_id );
       
  for ( i = 0; i < WRITER_LENGTH; i++ ) {
    value = 100*my_id + i;     /* producer == first digit of value */
    sem_wait( semid, BUF_SPACE );     /* wait semaphore for space available */
    sem_wait( semid, MUTEX );         /* wait semaphore for buffer access */
    buffer[*in] = value;       /* put data in buffer */
    *in = (*in + 1) % BUF_SIZE;
    sem_signal( semid, MUTEX );    /* signal semaphore for buffer access */
    sem_signal( semid, BUF_USED ); /* signal semaphore for something used */
  }
  printf( "Writer %d done.\n", my_id );  
}

/* Read and report values from shared memory buffer */
void consumer( int my_id, int* buffer, int* in, int* out, int semid ) {
  int i, value;
  printf ("The consumer process %d begins.\n", my_id);

  for ( i = 0; i < READER_LENGTH; i++ ) {
    sem_wait( semid, BUF_USED) ;  /* wait semaphore for something used */
    sem_wait( semid, MUTEX );     /* wait semaphore for buffer access */
    value = buffer[*out];  /* take data from buffer */
    *out = (*out + 1) % BUF_SIZE;
    sem_signal( semid, MUTEX );     /* signal semaphore for buffer access */
    sem_signal( semid, BUF_SPACE ); /* signal semaphore for space available */

    printf ("Reader %d: item %2d == %2d\n", my_id, i, value);

    if ( (i+my_id)%5 == 0 ) /* pause somewhere in processing */
      sleep(1);               /* to make output more interesting */
  }
  printf ("Reader %d done.\n", my_id);
}

int main( void ) {
  pid_t pid;          /* variable to record process id of child */

  /* shared memory elements */
  caddr_t shared_memory;   /* shared memory base address */
  int *in;         /* pointer to logical 'in' address for producer */
  int *out;        /* pointer to logical 'out' address for consumer */
  int *buffer;     /* logical base address for buffer */

  /* semaphore elements */
  int semid;       /* identifier for a semaphore set */

  /* local variables */
  int p_count;     

  /* set up shared memory segment */
  shared_memory = mmap( 0, SHARED_MEM_SIZE, 
	                PROT_READ | PROT_WRITE, 
                        MAP_ANON | MAP_SHARED,
			-1, 0);
  if ( shared_memory == (caddr_t) -1 ) {
    perror ("error in mmap while allocating shared memory\n");
    exit (1);
  }

  /* set up pointers to appropriate places in shared memory segment */
  buffer = (int*) shared_memory; /* logical buffer starts at shared segment */
  in  = (int*) shared_memory + BUF_SIZE*sizeof(int);
  out = (int*) shared_memory + (BUF_SIZE+1)*sizeof(int);

  *in = *out = 0;          /* initial starting points */

  /* create and initialize semaphores */
  semid = sem_create( NUM_SEMS );
  sem_init( semid, BUF_USED, 0 );
  sem_init( semid, BUF_SPACE, BUF_SIZE );
  sem_init( semid, MUTEX, 1 );

  /* spawn producer processes */
  for( p_count = 1; p_count <= NUM_WRITERS; p_count++ ) {
    if ( -1 == (pid = fork()) )  {
      perror ("error in fork");  
      exit (1);
    }

    if ( 0 == pid ) {
      /* processing for child == producer */
      producer( p_count, buffer, in, out, semid );
      exit(0);
    } 
  }
  
  /* spawn consumer processes */
  for ( p_count = 1; p_count <= NUM_READERS; p_count++ ) {
    if ( -1 == (pid = fork()) ) { 
      perror( "error in fork" );  
      exit( 1 );
    }

    if ( 0 == pid ) {
      /* processing for child == consumer */
      consumer( p_count, buffer, in, out, semid );
      exit(0);
    }
  }

  /* wait for all children to finish */
  printf("All child processes spawned by parent\n");
  printf("Parent waiting for children to finish\n");

  for ( p_count = 0; p_count < NUM_WRITERS+NUM_READERS; p_count++ ) {
    wait(NULL);
  }

  /* Remove the semaphore from the system and destroy the set of 
   *  semaphores and data structure associated with it. */  
  if ( semctl( semid, 0, IPC_RMID) < 0 ){
    perror ("error in removing semaphore from the system"); 
    exit (1); 
  }
  printf ("Semaphore cleanup complete.\n");
   
  exit (0);
}
