/* File:      queue.c
 * Author:    Keith Dennison
 * Created:   8 October, 1996
 * Copyright: 1996 Functional Objects, Inc. All rights reserved.
 *
 * Tests the thread primitives by implementing a queue based on an
 * example in D-doc-lib!threads.doc
 *
 * This example implements a queue using a notifcation and lock, and
 * creates four threads - two which write to the queue, and two which
 * read from it.
 * The output when sorted according to the timestamps, should be a mixed list
 * of alphabetic and numeric characters. The letters and numbers should appear
 * in order.
 */

#include <assert.h>

#include <stdlib.h>

#include <stddef.h>

#include <stdio.h>

#include <string.h>

#include <stddef.h>

#include <errno.h>

#include <time.h>

#include <process.h>

#include <sys/types.h>

#include <windows.h>


#include "win32-threads-primitives.h"



/* Comment this definition out to see the difference */
#define SYNCHRONISE



typedef struct element {
	struct element *next;
	void *value;
} ELEMENT;

typedef struct queue {
	struct element *first;
	struct element *last;
	CONTAINER notification, lock;
} QUEUE;



/* User calls this to initialise the queue */
void q_initialise(QUEUE *queue)
{
	queue->first = queue->last = NULL;
}


/* Test for empty queue */
int q_empty(QUEUE *queue)
{
	return(queue->first == NULL);
}


/* Internal implementation of pushing something on the queue */
void q_push(QUEUE *queue, void *value)
{
	ELEMENT *element;

	element = malloc(sizeof(ELEMENT));
	assert(element != NULL);
	element->next = NULL;
	element->value = value;
	if (queue->first == NULL)
		queue->first = queue->last = element;
	else {
		queue->last->next = element;
		Sleep(0);
		queue->last = element;
	}
}


/* Internal implementation for popping something off the front of the queue */
void *q_pop(QUEUE *queue)
{
	ELEMENT *element;
	void *value;

	assert(queue->first != NULL);

	element = queue->first;
	value = element->value;
	if (element == queue->last)
		queue->first = queue->last = NULL;
	else
		queue->first = element->next;
	free(element);
	return(value);
}


/* User calls this to put something on the queue
 */
void put_on_queue(QUEUE *queue, void *value)
{
#ifdef SYNCHRONISE

	primitive_wait_for_simple_lock(&queue->lock);
	if (q_empty(queue)) {
		primitive_release_all_notification(&queue->notification,
			&queue->lock);
	}
#endif


	q_push(queue, value);

#ifdef SYNCHRONISE

	primitive_release_simple_lock(&queue->lock);
#endif

}


/* User calls this to get something from the front of the queue. Returns a
 * timestamp to indicate when the item was removed as well as the value.
 */
void *get_from_queue(QUEUE *queue, DWORD *time)
{
	void *value;

#ifdef SYNCHRONISE

	primitive_wait_for_simple_lock(&queue->lock);
#endif


	while(q_empty(queue)) {
#ifdef SYNCHRONISE

		primitive_wait_for_notification(&queue->notification,
			&queue->lock);
#endif

	}
	*time = GetTickCount();
	value = q_pop(queue);

#ifdef SYNCHRONISE

	primitive_release_simple_lock(&queue->lock);
#endif


	return(value);
}



/* The queue which is shared by all threads */
QUEUE q;
DWORD start;

/* There are two threads which write to the queue. The first adds alphabetic
 * characters in order. the second adds numerals in order.
 */
Z feeder1(Z o, int n, ...)
{
	char c;

	for(c = 'a'; c <= 'z'; c++) {
		put_on_queue(&q, (void *)c);
		Sleep(rand() % 50);
	}
	put_on_queue(&q, (void *)('\0'));
	return(NULL);
}

Z feeder2(Z o, int n, ...)
{
	char c;

	for(c='0'; c <= '9'; c++) {
		put_on_queue(&q, (void *)c);
		Sleep(rand() % 100);
	}
	put_on_queue(&q, (void *)('\0'));
	return(NULL);
}

/* And there are two threads which read from the queue. Each pops an element
 * from the queue and sends it to standard output with a timestamp.
 */

LONG counter = 0;

Z reader1(Z o, int n, ...)
{
	DWORD time;
	char ch;

	do {
		ch = (char)get_from_queue(&q, &time);
		if (ch == '\0')
			InterlockedIncrement(&counter);
		else
			printf("%010d:%c\n", time-start, ch);
		Sleep(rand() % 100);
	} while (counter < 1);
	return(NULL);
}

Z reader2(Z o, int n, ...)
{
	DWORD time;
	char ch;
	int counter = 0;

	do {
		ch = (char)get_from_queue(&q, &time);
		if (ch == '\0')
			InterlockedIncrement(&counter);
		else
			printf("%010d:%c\n", time-start, ch);
		Sleep(rand() % 50);
	} while (counter < 1);
	return(NULL);
}



main()
{
	DTHREAD thread;
	DTHREAD tfeeder1, tfeeder2, treader1, treader2;

    primitive_initialize_current_thread(&thread);

	q_initialise(&q);

	assert(primitive_make_simple_lock(&q.lock, NULL) == 1);
	assert(primitive_make_notification(&q.notification, NULL) == 1);

	start = GetTickCount();

	/* Start the threads */
	assert(primitive_make_thread(&tfeeder1, NULL, (ZINT)I(0), feeder1) == 1);
	assert(primitive_make_thread(&tfeeder2, NULL, (ZINT)I(0), feeder2) == 1);
	assert(primitive_make_thread(&treader1, NULL, (ZINT)I(0), reader1) == 1);
	assert(primitive_make_thread(&treader2, NULL, (ZINT)I(0), reader2) == 1);

	/* Wait for the threads to terminate */
	primitive_thread_join_single(&treader1);
	primitive_thread_join_single(&treader2);
	primitive_thread_join_single(&tfeeder1);
	primitive_thread_join_single(&tfeeder2);

	primitive_destroy_notification(&q.notification);
	primitive_destroy_simple_lock(&q.lock);
	return(0);
}


syntax highlighted by Code2HTML, v. 0.9.1