Main.java
001 /**
002  *
003  * All content copyright (c) 2003-2008 Terracotta, Inc.,
004  * except as may otherwise be noted in a separate copyright notice.
005  * All rights reserved.
006  *
007  */
008 package demo.sharedqueue;
009 
010 import java.io.File;
011 import java.net.InetAddress;
012 import java.net.UnknownHostException;
013 
014 import javax.management.MBeanServer;
015 import javax.management.MBeanServerFactory;
016 import javax.management.MBeanServerNotification;
017 import javax.management.Notification;
018 import javax.management.NotificationFilter;
019 import javax.management.NotificationListener;
020 import javax.management.ObjectName;
021 
022 import org.mortbay.jetty.Connector;
023 import org.mortbay.jetty.Handler;
024 import org.mortbay.jetty.Server;
025 import org.mortbay.jetty.bio.SocketConnector;
026 import org.mortbay.jetty.handler.ContextHandler;
027 import org.mortbay.jetty.handler.HandlerCollection;
028 import org.mortbay.jetty.handler.ResourceHandler;
029 
030 /**
031  *  Description of the Class
032  *
033  *@author    Terracotta, Inc.
034  */
035 public class Main {
036    private final File cwd = new File(System.getProperty("user.dir"));
037 
038    private int lastPortUsed;
039 
040    private demo.sharedqueue.Queue queue;
041 
042    private Worker worker;
043 
044    public final void start(int portthrows Exception {
045       String nodeId = registerForNotifications();
046       port = setPort(port);
047 
048       System.out.println("DSO SharedQueue (node " + nodeId + ")");
049       System.out.println("Open your browser and go to - http://"
050             + getHostName() ":" + port + "/webapp\n");
051 
052       Server server = new Server();
053       Connector connector = new SocketConnector();
054       connector.setPort(port);
055       server.setConnectors(new Connector[]{connector});
056 
057       queue = new Queue(port);
058       worker = queue.createWorker(nodeId);
059 
060       ResourceHandler resourceHandler = new ResourceHandler();
061       resourceHandler.setResourceBase(".");
062 
063       ContextHandler ajaxContext = new ContextHandler();
064       ajaxContext.setContextPath(SimpleHttpHandler.ACTION);
065       ajaxContext.setResourceBase(cwd.getPath());
066       ajaxContext.setClassLoader(Thread.currentThread()
067             .getContextClassLoader());
068       ajaxContext.addHandler(new SimpleHttpHandler(queue));
069 
070       HandlerCollection handlers = new HandlerCollection();
071       handlers.setHandlers(new Handler[]{ajaxContext, resourceHandler});
072       server.setHandler(handlers);
073 
074       startReaper();
075       server.start();
076       server.join();
077    }
078 
079    private final int setPort(int port) {
080       if (port == -1) {
081          if (lastPortUsed == 0) {
082             port = lastPortUsed = 1990;
083          }
084          else {
085             port = ++lastPortUsed;
086          }
087       }
088       else {
089          lastPortUsed = port;
090       }
091       return port;
092    }
093 
094    /**
095     *  Starts a thread to identify dead workers (From nodes that have been
096     *  brought down) and removes them from the (shared) list of workers.
097     */
098    private final void startReaper() {
099       Thread reaper = new Thread(
100                new Runnable() {
101                   public void run() {
102                      while (true) {
103                         Main.this.queue.reap();
104                         try {
105                            Thread.sleep(1000);
106                         }
107                         catch (InterruptedException ie) {
108                            System.err.println(ie.getMessage());
109                         }
110                      }
111                   }
112                });
113       reaper.start();
114    }
115 
116    /**
117     *  Registers this client for JMX notifications.
118     *
119     *@return                Description of the Returned Value
120     *@exception  Exception  Description of Exception
121     *@returns               This clients Node ID
122     */
123    private final String registerForNotifications() throws Exception {
124       java.util.List servers = MBeanServerFactory.findMBeanServer(null);
125       if (servers.size() == 0) {
126 
127          System.err.println("WARNING: No JMX servers found, unable to register for notifications.");
128          return "0";
129       }
130 
131       MBeanServer server = (MBeanServerservers.get(0);
132       final ObjectName clusterBean = new ObjectName(
133             "org.terracotta:type=Terracotta Cluster,name=Terracotta Cluster Bean");
134       ObjectName delegateName =
135             ObjectName.getInstance("JMImplementation:type=MBeanServerDelegate");
136       final java.util.List clusterBeanBag = new java.util.ArrayList();
137 
138       // listener for newly registered MBeans
139       NotificationListener listener0 =
140          new NotificationListener() {
141             public void handleNotification(Notification notification,
142                   Object handback) {
143                synchronized (clusterBeanBag) {
144                   clusterBeanBag.add(handback);
145                   clusterBeanBag.notifyAll();
146                }
147             }
148          };
149 
150       // filter to let only clusterBean passed through
151       NotificationFilter filter0 =
152          new NotificationFilter() {
153             public boolean isNotificationEnabled(Notification notification) {
154                if (notification.getType().equals("JMX.mbean.registered")
155                      && ((MBeanServerNotificationnotification)
156                      .getMBeanName().equals(clusterBean)) {
157                   return true;
158                }
159                return false;
160             }
161          };
162 
163       // add our listener for clusterBean's registration
164       server.addNotificationListener(delegateName, listener0, filter0,
165             clusterBean);
166 
167       // because of race condition, clusterBean might already have registered
168       // before we registered the listener
169       java.util.Set allObjectNames = server.queryNames(null, null);
170 
171       if (!allObjectNames.contains(clusterBean)) {
172          synchronized (clusterBeanBag) {
173             while (clusterBeanBag.isEmpty()) {
174                clusterBeanBag.wait();
175             }
176          }
177       }
178 
179       // clusterBean is now registered, no need to listen for it
180       server.removeNotificationListener(delegateName, listener0);
181 
182       // listener for clustered bean events
183       NotificationListener listener1 =
184          new NotificationListener() {
185             public void handleNotification(Notification notification,
186                   Object handback) {
187                String nodeId = notification.getMessage();
188                Worker worker = Main.this.queue.getWorker(nodeId);
189                if (worker != null) {
190                   worker.markForExpiration();
191                }
192                else {
193                   System.err.println("Worker for nodeId: " + nodeId
194                         " not found.");
195                }
196             }
197          };
198 
199       // filter for nodeDisconnected notifications only
200       NotificationFilter filter1 =
201          new NotificationFilter() {
202             public boolean isNotificationEnabled(Notification notification) {
203                return notification.getType().equals(
204                      "com.tc.cluster.event.nodeDisconnected");
205             }
206          };
207 
208       // now that we have the clusterBean, add listener for membership events
209       server.addNotificationListener(clusterBean, listener1, filter1,
210             clusterBean);
211       return (server.getAttribute(clusterBean, "NodeId")).toString();
212    }
213 
214    public static final void main(String[] argsthrows Exception {
215       int port = -1;
216       try {
217          port = Integer.parseInt(args[0]);
218       }
219       catch (Exception e) {
220       }
221       (new Main()).start(port);
222    }
223 
224    static final String getHostName() {
225       try {
226          InetAddress addr = InetAddress.getLocalHost();
227          return addr.getHostName();
228       }
229       catch (UnknownHostException e) {
230          return "Unknown";
231       }
232    }
233 }