Worker.java
001 /**
002  *
003  * All content copyright (c) 2003-2008 Terracotta, Inc.,
004  * except as may otherwise be noted in a separate copyright notice.
005  * All rights reserved.
006  *
007  */
008 package demo.sharedqueue;
009 
010 import java.util.Collections;
011 import java.util.LinkedList;
012 import java.util.List;
013 import java.util.ListIterator;
014 
015 /**
016  *  Description of the Class
017  *
018  *@author    Terracotta, Inc.
019  */
020 class Worker implements Runnable {
021 
022    private final String name;
023 
024    private final int port;
025 
026    private final Queue queue;
027 
028    private final List jobs;
029 
030    private final String nodeId;
031 
032    private int health = HEALTH_ALIVE;
033 
034    private static final int HEALTH_ALIVE = 0;
035 
036    private static final int HEALTH_DYING = 1;
037 
038    private static final int HEALTH_DEAD = 2;
039 
040    private static final int MAX_LOAD = 10;
041 
042    public Worker(Queue queue, int port, String nodeId) {
043       this.name = Queue.getHostName();
044       this.port = port;
045       this.queue = queue;
046       this.nodeId = nodeId;
047       jobs = Collections.synchronizedList(new LinkedList());
048    }
049 
050    public final String getNodeId() {
051       return this.nodeId;
052    }
053 
054    public final String getName() {
055       return "node: " + nodeId + " (" + name + ":" + port + ")";
056    }
057 
058    public final String toXml() {
059       synchronized (jobs) {
060          String data = "<worker><name>" + getName() "</name><jobs>";
061          ListIterator i = jobs.listIterator();
062          while (i.hasNext()) {
063             data += ((Jobi.next()).toXml();
064          }
065          data += "</jobs></worker>";
066          return data;
067       }
068    }
069 
070    /**
071     *  Attempt to mark the Worker as dead (if it's already dying); Note that
072     *  we synchronize this method since it's mutating a shared object (this
073     *  class)
074     *
075     *@return    True if the Worker is dead.
076     */
077    public final synchronized boolean expire() {
078       if (HEALTH_DYING == health) {
079          // a dying Worker wont die until it has
080          // consumed all of it's jobs
081          if (jobs.size() 0) {
082             queue.addJob((Jobjobs.remove(0));
083          }
084          else {
085             setHealth(HEALTH_DEAD);
086          }
087       }
088       return (HEALTH_DEAD == health);
089    }
090 
091    /**
092     *  Set the state of the Worker's health to dying; Note that we synchronize
093     *  this method since it's mutating a shared object (this class)
094     *
095     */
096    public final synchronized void markForExpiration() {
097       setHealth(HEALTH_DYING);
098    }
099 
100    public final void run() {
101       while (HEALTH_DEAD != health) {
102          if ((HEALTH_ALIVE == health&& (jobs.size() < MAX_LOAD)) {
103             final Job job = queue.getJob();
104 
105             try {
106                Thread.sleep(500);
107             }
108             catch (InterruptedException ie) {
109                System.err.println(ie.getMessage());
110             }
111 
112             synchronized (jobs) {
113                jobs.add(job);
114             }
115 
116             Thread processor = new Thread(
117                      new Runnable() {
118                         public void run() {
119                            job.run(Worker.this);
120                            synchronized (jobs) {
121                               jobs.remove(job);
122                            }
123                            queue.log(job);
124                         }
125                      });
126             processor.start();
127          }
128       }
129    }
130 
131    /**
132     *  Set the state of the Worker's health; Note that we synchronize this
133     *  method since it's mutating a shared object (this class)
134     *
135     *@param  health
136     */
137    private final synchronized void setHealth(int health) {
138       this.health = health;
139    }
140 }