001 /**
002 *
003 * All content copyright (c) 2003-2008 Terracotta, Inc.,
004 * except as may otherwise be noted in a separate copyright notice.
005 * All rights reserved.
006 *
007 */
008 package demo.sharedqueue;
009
010 import java.net.InetAddress;
011 import java.net.UnknownHostException;
012 import java.util.Collections;
013 import java.util.LinkedList;
014 import java.util.List;
015 import java.util.ListIterator;
016
017 /**
018 * Description of the Class
019 *
020 *@author Terracotta, Inc.
021 */
022 public class Queue {
023 private List queue = Collections.synchronizedList(new LinkedList());
024
025 private List workers = Collections.synchronizedList(new LinkedList());
026
027 private List completedJobs = Collections.synchronizedList(new LinkedList());
028
029 private int nextJobId;
030
031 private int port;
032
033 private static final int MAX_HISTORY_LENGTH = 15;
034
035 private static final int MAX_QUEUE_LENGTH = 150;
036
037 public Queue(int port) {
038 this.port = port;
039 this.nextJobId = 1;
040 }
041
042 public final Job getJob() {
043 synchronized (queue) {
044 while (queue.size() == 0) {
045 try {
046 queue.wait();
047 }
048 catch (InterruptedException e) {
049 throw new RuntimeException(e);
050 }
051 }
052 return (Job) queue.remove(0);
053 }
054 }
055
056 public final String getXmlData() {
057 // the list of jobs in the queue
058 String data = "<workqueue>";
059 synchronized (queue) {
060 ListIterator i = queue.listIterator();
061 while (i.hasNext()) {
062 Job job = (Job) i.next();
063 data += job.toXml();
064 }
065 }
066 data += "</workqueue>";
067
068 // the list of completed jobs
069 data += "<completed>";
070 synchronized (completedJobs) {
071 ListIterator i = completedJobs.listIterator();
072 while (i.hasNext()) {
073 Job job = (Job) i.next();
074 data += job.toXml();
075 }
076 }
077 data += "</completed>";
078
079 // the list of registered job consumers
080 data += "<consumers>";
081 synchronized (workers) {
082 ListIterator i = workers.listIterator();
083 while (i.hasNext()) {
084 Worker worker = (Worker) i.next();
085 data += worker.toXml();
086 }
087 }
088 data += "</consumers>";
089 return data;
090 }
091
092 public final Worker getWorker(String nodeId) {
093 synchronized (workers) {
094 ListIterator i = workers.listIterator();
095 while (i.hasNext()) {
096 Worker worker = (Worker) i.next();
097 if (worker.getNodeId().equals(nodeId)) {
098 return worker;
099 }
100 }
101 }
102 return null;
103 }
104
105 public final Worker createWorker(String nodeId) {
106 synchronized (workers) {
107 Worker worker = new Worker(this, port, nodeId);
108 workers.add(worker);
109 Thread t = new Thread(worker);
110 t.setDaemon(true);
111 t.start();
112 return worker;
113 }
114 }
115
116 public final void log(Job job) {
117 synchronized (completedJobs) {
118 completedJobs.add(0, job);
119 if (completedJobs.size() > MAX_HISTORY_LENGTH) {
120 completedJobs.remove(completedJobs.size() - 1);
121 }
122 }
123 }
124
125 public final void reap() {
126 synchronized (workers) {
127 ListIterator i = workers.listIterator();
128 while (i.hasNext()) {
129 Worker worker = (Worker) i.next();
130 if (worker.expire()) {
131 i.remove();
132 }
133 }
134 }
135 }
136
137 public final void addJob() {
138 synchronized (queue) {
139 if (queue.size() < MAX_QUEUE_LENGTH) {
140 Job job = new Job(Queue.getHostName() + " " + this.port,
141 this.nextJobId);
142 this.nextJobId = this.nextJobId < 999 ? this.nextJobId + 1 : 1;
143 queue.add(job);
144 queue.notifyAll();
145 }
146 }
147 }
148
149 public final void addJob(Job job) {
150 synchronized (queue) {
151 queue.add(job);
152 queue.notifyAll();
153 }
154 }
155
156 public static final String getHostName() {
157 try {
158 final InetAddress addr = InetAddress.getLocalHost();
159 return addr.getHostName();
160 }
161 catch (UnknownHostException e) {
162 return "Unknown";
163 }
164 }
165 }
|