001 /**
002 *
003 * All content copyright (c) 2003-2008 Terracotta, Inc.,
004 * except as may otherwise be noted in a separate copyright notice.
005 * All rights reserved.
006 *
007 */
008 package demo.sharedqueue;
009
010 import java.io.File;
011 import java.net.InetAddress;
012 import java.net.UnknownHostException;
013
014 import javax.management.MBeanServer;
015 import javax.management.MBeanServerFactory;
016 import javax.management.MBeanServerNotification;
017 import javax.management.Notification;
018 import javax.management.NotificationFilter;
019 import javax.management.NotificationListener;
020 import javax.management.ObjectName;
021
022 import org.mortbay.jetty.Connector;
023 import org.mortbay.jetty.Handler;
024 import org.mortbay.jetty.Server;
025 import org.mortbay.jetty.bio.SocketConnector;
026 import org.mortbay.jetty.handler.ContextHandler;
027 import org.mortbay.jetty.handler.HandlerCollection;
028 import org.mortbay.jetty.handler.ResourceHandler;
029
030 /**
031 * Description of the Class
032 *
033 *@author Terracotta, Inc.
034 */
035 public class Main {
036 private final File cwd = new File(System.getProperty("user.dir"));
037
038 private int lastPortUsed;
039
040 private demo.sharedqueue.Queue queue;
041
042 private Worker worker;
043
044 public final void start(int port) throws Exception {
045 String nodeId = registerForNotifications();
046 port = setPort(port);
047
048 System.out.println("DSO SharedQueue (node " + nodeId + ")");
049 System.out.println("Open your browser and go to - http://"
050 + getHostName() + ":" + port + "/webapp\n");
051
052 Server server = new Server();
053 Connector connector = new SocketConnector();
054 connector.setPort(port);
055 server.setConnectors(new Connector[]{connector});
056
057 queue = new Queue(port);
058 worker = queue.createWorker(nodeId);
059
060 ResourceHandler resourceHandler = new ResourceHandler();
061 resourceHandler.setResourceBase(".");
062
063 ContextHandler ajaxContext = new ContextHandler();
064 ajaxContext.setContextPath(SimpleHttpHandler.ACTION);
065 ajaxContext.setResourceBase(cwd.getPath());
066 ajaxContext.setClassLoader(Thread.currentThread()
067 .getContextClassLoader());
068 ajaxContext.addHandler(new SimpleHttpHandler(queue));
069
070 HandlerCollection handlers = new HandlerCollection();
071 handlers.setHandlers(new Handler[]{ajaxContext, resourceHandler});
072 server.setHandler(handlers);
073
074 startReaper();
075 server.start();
076 server.join();
077 }
078
079 private final int setPort(int port) {
080 if (port == -1) {
081 if (lastPortUsed == 0) {
082 port = lastPortUsed = 1990;
083 }
084 else {
085 port = ++lastPortUsed;
086 }
087 }
088 else {
089 lastPortUsed = port;
090 }
091 return port;
092 }
093
094 /**
095 * Starts a thread to identify dead workers (From nodes that have been
096 * brought down) and removes them from the (shared) list of workers.
097 */
098 private final void startReaper() {
099 Thread reaper = new Thread(
100 new Runnable() {
101 public void run() {
102 while (true) {
103 Main.this.queue.reap();
104 try {
105 Thread.sleep(1000);
106 }
107 catch (InterruptedException ie) {
108 System.err.println(ie.getMessage());
109 }
110 }
111 }
112 });
113 reaper.start();
114 }
115
116 /**
117 * Registers this client for JMX notifications.
118 *
119 *@return Description of the Returned Value
120 *@exception Exception Description of Exception
121 *@returns This clients Node ID
122 */
123 private final String registerForNotifications() throws Exception {
124 java.util.List servers = MBeanServerFactory.findMBeanServer(null);
125 if (servers.size() == 0) {
126
127 System.err.println("WARNING: No JMX servers found, unable to register for notifications.");
128 return "0";
129 }
130
131 MBeanServer server = (MBeanServer) servers.get(0);
132 final ObjectName clusterBean = new ObjectName(
133 "org.terracotta:type=Terracotta Cluster,name=Terracotta Cluster Bean");
134 ObjectName delegateName =
135 ObjectName.getInstance("JMImplementation:type=MBeanServerDelegate");
136 final java.util.List clusterBeanBag = new java.util.ArrayList();
137
138 // listener for newly registered MBeans
139 NotificationListener listener0 =
140 new NotificationListener() {
141 public void handleNotification(Notification notification,
142 Object handback) {
143 synchronized (clusterBeanBag) {
144 clusterBeanBag.add(handback);
145 clusterBeanBag.notifyAll();
146 }
147 }
148 };
149
150 // filter to let only clusterBean passed through
151 NotificationFilter filter0 =
152 new NotificationFilter() {
153 public boolean isNotificationEnabled(Notification notification) {
154 if (notification.getType().equals("JMX.mbean.registered")
155 && ((MBeanServerNotification) notification)
156 .getMBeanName().equals(clusterBean)) {
157 return true;
158 }
159 return false;
160 }
161 };
162
163 // add our listener for clusterBean's registration
164 server.addNotificationListener(delegateName, listener0, filter0,
165 clusterBean);
166
167 // because of race condition, clusterBean might already have registered
168 // before we registered the listener
169 java.util.Set allObjectNames = server.queryNames(null, null);
170
171 if (!allObjectNames.contains(clusterBean)) {
172 synchronized (clusterBeanBag) {
173 while (clusterBeanBag.isEmpty()) {
174 clusterBeanBag.wait();
175 }
176 }
177 }
178
179 // clusterBean is now registered, no need to listen for it
180 server.removeNotificationListener(delegateName, listener0);
181
182 // listener for clustered bean events
183 NotificationListener listener1 =
184 new NotificationListener() {
185 public void handleNotification(Notification notification,
186 Object handback) {
187 String nodeId = notification.getMessage();
188 Worker worker = Main.this.queue.getWorker(nodeId);
189 if (worker != null) {
190 worker.markForExpiration();
191 }
192 else {
193 System.err.println("Worker for nodeId: " + nodeId
194 + " not found.");
195 }
196 }
197 };
198
199 // filter for nodeDisconnected notifications only
200 NotificationFilter filter1 =
201 new NotificationFilter() {
202 public boolean isNotificationEnabled(Notification notification) {
203 return notification.getType().equals(
204 "com.tc.cluster.event.nodeDisconnected");
205 }
206 };
207
208 // now that we have the clusterBean, add listener for membership events
209 server.addNotificationListener(clusterBean, listener1, filter1,
210 clusterBean);
211 return (server.getAttribute(clusterBean, "NodeId")).toString();
212 }
213
214 public static final void main(String[] args) throws Exception {
215 int port = -1;
216 try {
217 port = Integer.parseInt(args[0]);
218 }
219 catch (Exception e) {
220 }
221 (new Main()).start(port);
222 }
223
224 static final String getHostName() {
225 try {
226 InetAddress addr = InetAddress.getLocalHost();
227 return addr.getHostName();
228 }
229 catch (UnknownHostException e) {
230 return "Unknown";
231 }
232 }
233 }
|