001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.usage; 018 019import java.util.ArrayList; 020import java.util.Iterator; 021import java.util.LinkedList; 022import java.util.List; 023import java.util.concurrent.CopyOnWriteArrayList; 024import java.util.concurrent.ThreadPoolExecutor; 025import java.util.concurrent.atomic.AtomicBoolean; 026import org.apache.activemq.Service; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030/** 031 * Used to keep track of how much of something is being used so that a 032 * productive working set usage can be controlled. Main use case is manage 033 * memory usage. 034 * 035 * @org.apache.xbean.XBean 036 * 037 */ 038public abstract class Usage<T extends Usage> implements Service { 039 040 private static final Logger LOG = LoggerFactory.getLogger(Usage.class); 041 protected final Object usageMutex = new Object(); 042 protected int percentUsage; 043 protected T parent; 044 private UsageCapacity limiter = new DefaultUsageCapacity(); 045 private int percentUsageMinDelta = 1; 046 private final List<UsageListener> listeners = new CopyOnWriteArrayList<UsageListener>(); 047 private final boolean debug = LOG.isDebugEnabled(); 048 protected String name; 049 private float usagePortion = 1.0f; 050 private final List<T> children = new CopyOnWriteArrayList<T>(); 051 private final List<Runnable> callbacks = new LinkedList<Runnable>(); 052 private int pollingTime = 100; 053 private final AtomicBoolean started=new AtomicBoolean(); 054 private ThreadPoolExecutor executor; 055 public Usage(T parent, String name, float portion) { 056 this.parent = parent; 057 this.usagePortion = portion; 058 if (parent != null) { 059 this.limiter.setLimit((long)(parent.getLimit() * portion)); 060 name = parent.name + ":" + name; 061 } 062 this.name = name; 063 } 064 065 protected abstract long retrieveUsage(); 066 067 /** 068 * @throws InterruptedException 069 */ 070 public void waitForSpace() throws InterruptedException { 071 waitForSpace(0); 072 } 073 074 public boolean waitForSpace(long timeout) throws InterruptedException { 075 return waitForSpace(timeout, 100); 076 } 077 078 /** 079 * @param timeout 080 * @throws InterruptedException 081 * @return true if space 082 */ 083 public boolean waitForSpace(long timeout, int highWaterMark) throws InterruptedException { 084 if (parent != null) { 085 if (!parent.waitForSpace(timeout, highWaterMark)) { 086 return false; 087 } 088 } 089 synchronized (usageMutex) { 090 percentUsage=caclPercentUsage(); 091 if (percentUsage >= highWaterMark) { 092 long deadline = timeout > 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE; 093 long timeleft = deadline; 094 while (timeleft > 0) { 095 percentUsage=caclPercentUsage(); 096 if (percentUsage >= highWaterMark) { 097 usageMutex.wait(pollingTime); 098 timeleft = deadline - System.currentTimeMillis(); 099 } else { 100 break; 101 } 102 } 103 } 104 return percentUsage < highWaterMark; 105 } 106 } 107 108 public boolean isFull() { 109 return isFull(100); 110 } 111 112 public boolean isFull(int highWaterMark) { 113 if (parent != null && parent.isFull(highWaterMark)) { 114 return true; 115 } 116 synchronized (usageMutex) { 117 percentUsage=caclPercentUsage(); 118 return percentUsage >= highWaterMark; 119 } 120 } 121 122 public void addUsageListener(UsageListener listener) { 123 listeners.add(listener); 124 } 125 126 public void removeUsageListener(UsageListener listener) { 127 listeners.remove(listener); 128 } 129 130 public long getLimit() { 131 synchronized (usageMutex) { 132 return limiter.getLimit(); 133 } 134 } 135 136 /** 137 * Sets the memory limit in bytes. Setting the limit in bytes will set the 138 * usagePortion to 0 since the UsageManager is not going to be portion based 139 * off the parent. 140 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 141 * 142 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 143 */ 144 public void setLimit(long limit) { 145 if (percentUsageMinDelta < 0) { 146 throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal to 0"); 147 } 148 synchronized (usageMutex) { 149 this.limiter.setLimit(limit); 150 this.usagePortion = 0; 151 } 152 onLimitChange(); 153 } 154 155 protected void onLimitChange() { 156 // We may need to calculate the limit 157 if (usagePortion > 0 && parent != null) { 158 synchronized (usageMutex) { 159 this.limiter.setLimit((long)(parent.getLimit() * usagePortion)); 160 } 161 } 162 // Reset the percent currently being used. 163 int percentUsage; 164 synchronized (usageMutex) { 165 percentUsage = caclPercentUsage(); 166 } 167 setPercentUsage(percentUsage); 168 // Let the children know that the limit has changed. They may need to 169 // set 170 // their limits based on ours. 171 for (T child : children) { 172 child.onLimitChange(); 173 } 174 } 175 176 public float getUsagePortion() { 177 synchronized (usageMutex) { 178 return usagePortion; 179 } 180 } 181 182 public void setUsagePortion(float usagePortion) { 183 synchronized (usageMutex) { 184 this.usagePortion = usagePortion; 185 } 186 onLimitChange(); 187 } 188 189 public int getPercentUsage() { 190 synchronized (usageMutex) { 191 return percentUsage; 192 } 193 } 194 195 public int getPercentUsageMinDelta() { 196 synchronized (usageMutex) { 197 return percentUsageMinDelta; 198 } 199 } 200 201 /** 202 * Sets the minimum number of percentage points the usage has to change 203 * before a UsageListener event is fired by the manager. 204 * 205 * @param percentUsageMinDelta 206 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 207 */ 208 public void setPercentUsageMinDelta(int percentUsageMinDelta) { 209 if (percentUsageMinDelta < 1) { 210 throw new IllegalArgumentException("percentUsageMinDelta must be greater than 0"); 211 } 212 int percentUsage; 213 synchronized (usageMutex) { 214 this.percentUsageMinDelta = percentUsageMinDelta; 215 percentUsage = caclPercentUsage(); 216 } 217 setPercentUsage(percentUsage); 218 } 219 220 public long getUsage() { 221 synchronized (usageMutex) { 222 return retrieveUsage(); 223 } 224 } 225 226 protected void setPercentUsage(int value) { 227 synchronized (usageMutex) { 228 int oldValue = percentUsage; 229 percentUsage = value; 230 if (oldValue != value) { 231 fireEvent(oldValue, value); 232 } 233 } 234 } 235 236 protected int caclPercentUsage() { 237 if (limiter.getLimit() == 0) { 238 return 0; 239 } 240 return (int)((((retrieveUsage() * 100) / limiter.getLimit()) / percentUsageMinDelta) * percentUsageMinDelta); 241 } 242 243 private void fireEvent(final int oldPercentUsage, final int newPercentUsage) { 244 if (debug) { 245 LOG.debug(getName() + ": usage change from: " + oldPercentUsage + "% of available memory, to: " 246 + newPercentUsage + "% of available memory"); 247 } 248 if (started.get()) { 249 // Switching from being full to not being full.. 250 if (oldPercentUsage >= 100 && newPercentUsage < 100) { 251 synchronized (usageMutex) { 252 usageMutex.notifyAll(); 253 if (!callbacks.isEmpty()) { 254 for (Iterator<Runnable> iter = new ArrayList<Runnable>(callbacks).iterator(); iter.hasNext();) { 255 Runnable callback = iter.next(); 256 getExecutor().execute(callback); 257 } 258 callbacks.clear(); 259 } 260 } 261 } 262 if (!listeners.isEmpty()) { 263 // Let the listeners know on a separate thread 264 Runnable listenerNotifier = new Runnable() { 265 public void run() { 266 for (Iterator<UsageListener> iter = listeners.iterator(); iter.hasNext();) { 267 UsageListener l = iter.next(); 268 l.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage); 269 } 270 } 271 }; 272 if (started.get()) { 273 getExecutor().execute(listenerNotifier); 274 } else { 275 LOG.warn("Not notifying memory usage change to listeners on shutdown"); 276 } 277 } 278 } 279 } 280 281 public String getName() { 282 return name; 283 } 284 285 @Override 286 public String toString() { 287 return "Usage(" + getName() + ") percentUsage=" + percentUsage + "%, usage=" + retrieveUsage() + " limit=" + limiter.getLimit() + " percentUsageMinDelta=" + percentUsageMinDelta + "%"; 288 } 289 290 @SuppressWarnings("unchecked") 291 public void start() { 292 if (started.compareAndSet(false, true)){ 293 if (parent != null) { 294 parent.addChild(this); 295 } 296 for (T t:children) { 297 t.start(); 298 } 299 } 300 } 301 302 @SuppressWarnings("unchecked") 303 public void stop() { 304 if (started.compareAndSet(true, false)){ 305 if (parent != null) { 306 parent.removeChild(this); 307 } 308 309 //clear down any callbacks 310 synchronized (usageMutex) { 311 usageMutex.notifyAll(); 312 for (Iterator<Runnable> iter = new ArrayList<Runnable>(this.callbacks).iterator(); iter.hasNext();) { 313 Runnable callback = iter.next(); 314 callback.run(); 315 } 316 this.callbacks.clear(); 317 } 318 for (T t:children) { 319 t.stop(); 320 } 321 } 322 } 323 324 protected void addChild(T child) { 325 children.add(child); 326 if (started.get()) { 327 child.start(); 328 } 329 } 330 331 protected void removeChild(T child) { 332 children.remove(child); 333 } 334 335 /** 336 * @param callback 337 * @return true if the UsageManager was full. The callback will only be 338 * called if this method returns true. 339 */ 340 public boolean notifyCallbackWhenNotFull(final Runnable callback) { 341 if (parent != null) { 342 Runnable r = new Runnable() { 343 344 public void run() { 345 synchronized (usageMutex) { 346 if (percentUsage >= 100) { 347 callbacks.add(callback); 348 } else { 349 callback.run(); 350 } 351 } 352 } 353 }; 354 if (parent.notifyCallbackWhenNotFull(r)) { 355 return true; 356 } 357 } 358 synchronized (usageMutex) { 359 if (percentUsage >= 100) { 360 callbacks.add(callback); 361 return true; 362 } else { 363 return false; 364 } 365 } 366 } 367 368 /** 369 * @return the limiter 370 */ 371 public UsageCapacity getLimiter() { 372 return this.limiter; 373 } 374 375 /** 376 * @param limiter the limiter to set 377 */ 378 public void setLimiter(UsageCapacity limiter) { 379 this.limiter = limiter; 380 } 381 382 /** 383 * @return the pollingTime 384 */ 385 public int getPollingTime() { 386 return this.pollingTime; 387 } 388 389 /** 390 * @param pollingTime the pollingTime to set 391 */ 392 public void setPollingTime(int pollingTime) { 393 this.pollingTime = pollingTime; 394 } 395 396 public void setName(String name) { 397 this.name = name; 398 } 399 400 public T getParent() { 401 return parent; 402 } 403 404 public void setParent(T parent) { 405 this.parent = parent; 406 } 407 408 public void setExecutor (ThreadPoolExecutor executor) { 409 this.executor = executor; 410 } 411 public ThreadPoolExecutor getExecutor() { 412 return executor; 413 } 414}