001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.usage;
018
019import java.util.ArrayList;
020import java.util.Iterator;
021import java.util.LinkedList;
022import java.util.List;
023import java.util.concurrent.CopyOnWriteArrayList;
024import java.util.concurrent.ThreadPoolExecutor;
025import java.util.concurrent.atomic.AtomicBoolean;
026import org.apache.activemq.Service;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030/**
031 * Used to keep track of how much of something is being used so that a
032 * productive working set usage can be controlled. Main use case is manage
033 * memory usage.
034 * 
035 * @org.apache.xbean.XBean
036 * 
037 */
038public abstract class Usage<T extends Usage> implements Service {
039
040    private static final Logger LOG = LoggerFactory.getLogger(Usage.class);
041    protected final Object usageMutex = new Object();
042    protected int percentUsage;
043    protected T parent;
044    private UsageCapacity limiter = new DefaultUsageCapacity();
045    private int percentUsageMinDelta = 1;
046    private final List<UsageListener> listeners = new CopyOnWriteArrayList<UsageListener>();
047    private final boolean debug = LOG.isDebugEnabled();
048    protected String name;
049    private float usagePortion = 1.0f;
050    private final List<T> children = new CopyOnWriteArrayList<T>();
051    private final List<Runnable> callbacks = new LinkedList<Runnable>();
052    private int pollingTime = 100;
053    private final AtomicBoolean started=new AtomicBoolean();
054    private ThreadPoolExecutor executor;
055    public Usage(T parent, String name, float portion) {
056        this.parent = parent;
057        this.usagePortion = portion;
058        if (parent != null) {
059            this.limiter.setLimit((long)(parent.getLimit() * portion));
060            name = parent.name + ":" + name;
061        }
062        this.name = name;
063    }
064
065    protected abstract long retrieveUsage();
066
067    /**
068     * @throws InterruptedException
069     */
070    public void waitForSpace() throws InterruptedException {
071        waitForSpace(0);
072    }
073
074    public boolean waitForSpace(long timeout) throws InterruptedException {
075        return waitForSpace(timeout, 100);
076    }
077    
078    /**
079     * @param timeout
080     * @throws InterruptedException
081     * @return true if space
082     */
083    public boolean waitForSpace(long timeout, int highWaterMark) throws InterruptedException {
084        if (parent != null) {
085            if (!parent.waitForSpace(timeout, highWaterMark)) {
086                return false;
087            }
088        }
089        synchronized (usageMutex) {
090            percentUsage=caclPercentUsage();
091            if (percentUsage >= highWaterMark) {
092                long deadline = timeout > 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
093                long timeleft = deadline;
094                while (timeleft > 0) {
095                    percentUsage=caclPercentUsage();
096                    if (percentUsage >= highWaterMark) {
097                        usageMutex.wait(pollingTime);
098                        timeleft = deadline - System.currentTimeMillis();
099                    } else {
100                        break;
101                    }
102                }
103            }
104            return percentUsage < highWaterMark;
105        }
106    }
107
108    public boolean isFull() {
109        return isFull(100);
110    }
111    
112    public boolean isFull(int highWaterMark) {
113        if (parent != null && parent.isFull(highWaterMark)) {
114            return true;
115        }
116        synchronized (usageMutex) {
117            percentUsage=caclPercentUsage();
118            return percentUsage >= highWaterMark;
119        }
120    }
121
122    public void addUsageListener(UsageListener listener) {
123        listeners.add(listener);
124    }
125
126    public void removeUsageListener(UsageListener listener) {
127        listeners.remove(listener);
128    }
129
130    public long getLimit() {
131        synchronized (usageMutex) {
132            return limiter.getLimit();
133        }
134    }
135
136    /**
137     * Sets the memory limit in bytes. Setting the limit in bytes will set the
138     * usagePortion to 0 since the UsageManager is not going to be portion based
139     * off the parent.
140     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
141     * 
142     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
143     */
144    public void setLimit(long limit) {
145        if (percentUsageMinDelta < 0) {
146            throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal to 0");
147        }
148        synchronized (usageMutex) {
149            this.limiter.setLimit(limit);
150            this.usagePortion = 0;
151        }
152        onLimitChange();
153    }
154
155    protected void onLimitChange() {
156        // We may need to calculate the limit
157        if (usagePortion > 0 && parent != null) {
158            synchronized (usageMutex) {
159                this.limiter.setLimit((long)(parent.getLimit() * usagePortion));
160            }
161        }
162        // Reset the percent currently being used.
163        int percentUsage;
164        synchronized (usageMutex) {
165            percentUsage = caclPercentUsage();
166        }
167        setPercentUsage(percentUsage);
168        // Let the children know that the limit has changed. They may need to
169        // set
170        // their limits based on ours.
171        for (T child : children) {
172            child.onLimitChange();
173        }
174    }
175
176    public float getUsagePortion() {
177        synchronized (usageMutex) {
178            return usagePortion;
179        }
180    }
181
182    public void setUsagePortion(float usagePortion) {
183        synchronized (usageMutex) {
184            this.usagePortion = usagePortion;
185        }
186        onLimitChange();
187    }
188
189    public int getPercentUsage() {
190        synchronized (usageMutex) {
191            return percentUsage;
192        }
193    }
194
195    public int getPercentUsageMinDelta() {
196        synchronized (usageMutex) {
197            return percentUsageMinDelta;
198        }
199    }
200
201    /**
202     * Sets the minimum number of percentage points the usage has to change
203     * before a UsageListener event is fired by the manager.
204     * 
205     * @param percentUsageMinDelta
206     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
207     */
208    public void setPercentUsageMinDelta(int percentUsageMinDelta) {
209        if (percentUsageMinDelta < 1) {
210            throw new IllegalArgumentException("percentUsageMinDelta must be greater than 0");
211        }
212        int percentUsage;
213        synchronized (usageMutex) {
214            this.percentUsageMinDelta = percentUsageMinDelta;
215            percentUsage = caclPercentUsage();
216        }
217        setPercentUsage(percentUsage);
218    }
219
220    public long getUsage() {
221        synchronized (usageMutex) {
222            return retrieveUsage();
223        }
224    }
225
226    protected void setPercentUsage(int value) {
227        synchronized (usageMutex) {
228            int oldValue = percentUsage;
229            percentUsage = value;
230            if (oldValue != value) {
231                fireEvent(oldValue, value);
232            }
233        }
234    }
235
236    protected int caclPercentUsage() {
237        if (limiter.getLimit() == 0) {
238            return 0;
239        }
240        return (int)((((retrieveUsage() * 100) / limiter.getLimit()) / percentUsageMinDelta) * percentUsageMinDelta);
241    }
242
243    private void fireEvent(final int oldPercentUsage, final int newPercentUsage) {
244        if (debug) {
245            LOG.debug(getName() + ": usage change from: " + oldPercentUsage + "% of available memory, to: " 
246                + newPercentUsage + "% of available memory");
247        }   
248        if (started.get()) {
249            // Switching from being full to not being full..
250            if (oldPercentUsage >= 100 && newPercentUsage < 100) {
251                synchronized (usageMutex) {
252                    usageMutex.notifyAll();
253                    if (!callbacks.isEmpty()) {
254                        for (Iterator<Runnable> iter = new ArrayList<Runnable>(callbacks).iterator(); iter.hasNext();) {
255                            Runnable callback = iter.next();
256                            getExecutor().execute(callback);
257                        }
258                        callbacks.clear();
259                    }
260                }
261            }
262            if (!listeners.isEmpty()) {
263                // Let the listeners know on a separate thread
264                Runnable listenerNotifier = new Runnable() {
265                    public void run() {
266                        for (Iterator<UsageListener> iter = listeners.iterator(); iter.hasNext();) {
267                            UsageListener l = iter.next();
268                            l.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage);
269                        }
270                    }
271                };
272                if (started.get()) {
273                    getExecutor().execute(listenerNotifier);
274                } else {
275                    LOG.warn("Not notifying memory usage change to listeners on shutdown");
276                }
277            }
278        }
279    }
280
281    public String getName() {
282        return name;
283    }
284
285    @Override
286    public String toString() {
287        return "Usage(" + getName() + ") percentUsage=" + percentUsage + "%, usage=" + retrieveUsage() + " limit=" + limiter.getLimit() + " percentUsageMinDelta=" + percentUsageMinDelta + "%";
288    }
289
290    @SuppressWarnings("unchecked")
291    public void start() {
292        if (started.compareAndSet(false, true)){
293            if (parent != null) {
294                parent.addChild(this);
295            }
296            for (T t:children) {
297                t.start();
298            }
299        }
300    }
301
302    @SuppressWarnings("unchecked")
303    public void stop() {
304        if (started.compareAndSet(true, false)){
305            if (parent != null) {
306                parent.removeChild(this);
307            }
308            
309            //clear down any callbacks
310            synchronized (usageMutex) {
311                usageMutex.notifyAll();
312                for (Iterator<Runnable> iter = new ArrayList<Runnable>(this.callbacks).iterator(); iter.hasNext();) {
313                    Runnable callback = iter.next();
314                    callback.run();
315                }
316                this.callbacks.clear();
317            }
318            for (T t:children) {
319                t.stop();
320            }
321        }
322    }
323
324    protected void addChild(T child) {
325        children.add(child);
326        if (started.get()) {
327            child.start();
328        }
329    }
330
331    protected void removeChild(T child) {
332        children.remove(child);
333    }
334
335    /**
336     * @param callback
337     * @return true if the UsageManager was full. The callback will only be
338     *         called if this method returns true.
339     */
340    public boolean notifyCallbackWhenNotFull(final Runnable callback) {
341        if (parent != null) {
342            Runnable r = new Runnable() {
343
344                public void run() {
345                    synchronized (usageMutex) {
346                        if (percentUsage >= 100) {
347                            callbacks.add(callback);
348                        } else {
349                            callback.run();
350                        }
351                    }
352                }
353            };
354            if (parent.notifyCallbackWhenNotFull(r)) {
355                return true;
356            }
357        }
358        synchronized (usageMutex) {
359            if (percentUsage >= 100) {
360                callbacks.add(callback);
361                return true;
362            } else {
363                return false;
364            }
365        }
366    }
367
368    /**
369     * @return the limiter
370     */
371    public UsageCapacity getLimiter() {
372        return this.limiter;
373    }
374
375    /**
376     * @param limiter the limiter to set
377     */
378    public void setLimiter(UsageCapacity limiter) {
379        this.limiter = limiter;
380    }
381
382    /**
383     * @return the pollingTime
384     */
385    public int getPollingTime() {
386        return this.pollingTime;
387    }
388
389    /**
390     * @param pollingTime the pollingTime to set
391     */
392    public void setPollingTime(int pollingTime) {
393        this.pollingTime = pollingTime;
394    }
395
396    public void setName(String name) {
397        this.name = name;
398    }
399
400    public T getParent() {
401        return parent;
402    }
403
404    public void setParent(T parent) {
405        this.parent = parent;
406    }
407    
408    public void setExecutor (ThreadPoolExecutor executor) {
409        this.executor = executor;
410    }
411    public ThreadPoolExecutor getExecutor() {
412        return executor;
413    }
414}