001 /**
002 *
003 * All content copyright (c) 2003-2008 Terracotta, Inc.,
004 * except as may otherwise be noted in a separate copyright notice.
005 * All rights reserved.
006 *
007 */
008 package demo.sharedqueue;
009
010 import java.util.Collections;
011 import java.util.LinkedList;
012 import java.util.List;
013 import java.util.ListIterator;
014
015 /**
016 * Description of the Class
017 *
018 *@author Terracotta, Inc.
019 */
020 class Worker implements Runnable {
021
022 private final String name;
023
024 private final int port;
025
026 private final Queue queue;
027
028 private final List jobs;
029
030 private final String nodeId;
031
032 private int health = HEALTH_ALIVE;
033
034 private static final int HEALTH_ALIVE = 0;
035
036 private static final int HEALTH_DYING = 1;
037
038 private static final int HEALTH_DEAD = 2;
039
040 private static final int MAX_LOAD = 10;
041
042 public Worker(Queue queue, int port, String nodeId) {
043 this.name = Queue.getHostName();
044 this.port = port;
045 this.queue = queue;
046 this.nodeId = nodeId;
047 jobs = Collections.synchronizedList(new LinkedList());
048 }
049
050 public final String getNodeId() {
051 return this.nodeId;
052 }
053
054 public final String getName() {
055 return "node: " + nodeId + " (" + name + ":" + port + ")";
056 }
057
058 public final String toXml() {
059 synchronized (jobs) {
060 String data = "<worker><name>" + getName() + "</name><jobs>";
061 ListIterator i = jobs.listIterator();
062 while (i.hasNext()) {
063 data += ((Job) i.next()).toXml();
064 }
065 data += "</jobs></worker>";
066 return data;
067 }
068 }
069
070 /**
071 * Attempt to mark the Worker as dead (if it's already dying); Note that
072 * we synchronize this method since it's mutating a shared object (this
073 * class)
074 *
075 *@return True if the Worker is dead.
076 */
077 public final synchronized boolean expire() {
078 if (HEALTH_DYING == health) {
079 // a dying Worker wont die until it has
080 // consumed all of it's jobs
081 if (jobs.size() > 0) {
082 queue.addJob((Job) jobs.remove(0));
083 }
084 else {
085 setHealth(HEALTH_DEAD);
086 }
087 }
088 return (HEALTH_DEAD == health);
089 }
090
091 /**
092 * Set the state of the Worker's health to dying; Note that we synchronize
093 * this method since it's mutating a shared object (this class)
094 *
095 */
096 public final synchronized void markForExpiration() {
097 setHealth(HEALTH_DYING);
098 }
099
100 public final void run() {
101 while (HEALTH_DEAD != health) {
102 if ((HEALTH_ALIVE == health) && (jobs.size() < MAX_LOAD)) {
103 final Job job = queue.getJob();
104
105 try {
106 Thread.sleep(500);
107 }
108 catch (InterruptedException ie) {
109 System.err.println(ie.getMessage());
110 }
111
112 synchronized (jobs) {
113 jobs.add(job);
114 }
115
116 Thread processor = new Thread(
117 new Runnable() {
118 public void run() {
119 job.run(Worker.this);
120 synchronized (jobs) {
121 jobs.remove(job);
122 }
123 queue.log(job);
124 }
125 });
126 processor.start();
127 }
128 }
129 }
130
131 /**
132 * Set the state of the Worker's health; Note that we synchronize this
133 * method since it's mutating a shared object (this class)
134 *
135 *@param health
136 */
137 private final synchronized void setHealth(int health) {
138 this.health = health;
139 }
140 }
|