Queue.java
001 /**
002  *
003  * All content copyright (c) 2003-2008 Terracotta, Inc.,
004  * except as may otherwise be noted in a separate copyright notice.
005  * All rights reserved.
006  *
007  */
008 package demo.sharedqueue;
009 
010 import java.net.InetAddress;
011 import java.net.UnknownHostException;
012 import java.util.Collections;
013 import java.util.LinkedList;
014 import java.util.List;
015 import java.util.ListIterator;
016 
017 /**
018  *  Description of the Class
019  *
020  *@author    Terracotta, Inc.
021  */
022 public class Queue {
023    private List queue = Collections.synchronizedList(new LinkedList());
024 
025    private List workers = Collections.synchronizedList(new LinkedList());
026 
027    private List completedJobs = Collections.synchronizedList(new LinkedList());
028 
029    private int nextJobId;
030 
031    private int port;
032 
033    private static final int MAX_HISTORY_LENGTH = 15;
034 
035    private static final int MAX_QUEUE_LENGTH = 150;
036 
037    public Queue(int port) {
038       this.port = port;
039       this.nextJobId = 1;
040    }
041 
042    public final Job getJob() {
043       synchronized (queue) {
044          while (queue.size() == 0) {
045             try {
046                queue.wait();
047             }
048             catch (InterruptedException e) {
049                throw new RuntimeException(e);
050             }
051          }
052          return (Jobqueue.remove(0);
053       }
054    }
055 
056    public final String getXmlData() {
057       // the list of jobs in the queue
058       String data = "<workqueue>";
059       synchronized (queue) {
060          ListIterator i = queue.listIterator();
061          while (i.hasNext()) {
062             Job job = (Jobi.next();
063             data += job.toXml();
064          }
065       }
066       data += "</workqueue>";
067 
068       // the list of completed jobs
069       data += "<completed>";
070       synchronized (completedJobs) {
071          ListIterator i = completedJobs.listIterator();
072          while (i.hasNext()) {
073             Job job = (Jobi.next();
074             data += job.toXml();
075          }
076       }
077       data += "</completed>";
078 
079       // the list of registered job consumers
080       data += "<consumers>";
081       synchronized (workers) {
082          ListIterator i = workers.listIterator();
083          while (i.hasNext()) {
084             Worker worker = (Workeri.next();
085             data += worker.toXml();
086          }
087       }
088       data += "</consumers>";
089       return data;
090    }
091 
092    public final Worker getWorker(String nodeId) {
093       synchronized (workers) {
094          ListIterator i = workers.listIterator();
095          while (i.hasNext()) {
096             Worker worker = (Workeri.next();
097             if (worker.getNodeId().equals(nodeId)) {
098                return worker;
099             }
100          }
101       }
102       return null;
103    }
104 
105    public final Worker createWorker(String nodeId) {
106       synchronized (workers) {
107          Worker worker = new Worker(this, port, nodeId);
108          workers.add(worker);
109          Thread t = new Thread(worker);
110          t.setDaemon(true);
111          t.start();
112          return worker;
113       }
114    }
115 
116    public final void log(Job job) {
117       synchronized (completedJobs) {
118          completedJobs.add(0, job);
119          if (completedJobs.size() > MAX_HISTORY_LENGTH) {
120             completedJobs.remove(completedJobs.size() 1);
121          }
122       }
123    }
124 
125    public final void reap() {
126       synchronized (workers) {
127          ListIterator i = workers.listIterator();
128          while (i.hasNext()) {
129             Worker worker = (Workeri.next();
130             if (worker.expire()) {
131                i.remove();
132             }
133          }
134       }
135    }
136 
137    public final void addJob() {
138       synchronized (queue) {
139          if (queue.size() < MAX_QUEUE_LENGTH) {
140             Job job = new Job(Queue.getHostName() " " this.port,
141                   this.nextJobId);
142             this.nextJobId = this.nextJobId < 999 this.nextJobId + 1;
143             queue.add(job);
144             queue.notifyAll();
145          }
146       }
147    }
148 
149    public final void addJob(Job job) {
150       synchronized (queue) {
151          queue.add(job);
152          queue.notifyAll();
153       }
154    }
155 
156    public static final String getHostName() {
157       try {
158          final InetAddress addr = InetAddress.getLocalHost();
159          return addr.getHostName();
160       }
161       catch (UnknownHostException e) {
162          return "Unknown";
163       }
164    }
165 }