001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.thread;
018
019/**
020 * A Valve is a synchronization object used enable or disable the "flow" of
021 * concurrent processing.
022 * 
023 * 
024 */
025public final class Valve {
026
027    private final Object mutex = new Object();
028    private boolean on;
029    private int turningOff;
030    private int usage;
031
032    public Valve(boolean on) {
033        this.on = on;
034    }
035
036    /**
037     * Turns the valve on. This method blocks until the valve is off.
038     * 
039     * @throws InterruptedException if wait is interrupted
040     */
041    public void turnOn() throws InterruptedException {
042        synchronized (mutex) {
043            while (on) {
044                mutex.wait();
045            }
046            on = true;
047            mutex.notifyAll();
048        }
049    }
050
051    public boolean isOn() {
052        synchronized (mutex) {
053            return on;
054        }
055    }
056
057    /**
058     * Turns the valve off. This method blocks until the valve is on and the
059     * valve is not in use.
060     * 
061     * @throws InterruptedException if wait is interrupted
062     */
063    public void turnOff() throws InterruptedException {
064        synchronized (mutex) {
065            if (turningOff < 0) {
066                throw new IllegalStateException("Unbalanced turningOff: " + turningOff);
067            }
068            try {
069                ++turningOff;
070                while (usage > 0 || !on) {
071                    mutex.wait();
072                }
073                on = false;
074                mutex.notifyAll();
075            } finally {
076                --turningOff;
077            }
078        }
079    }
080
081    /**
082     * Increments the use counter of the valve. This method blocks if the valve
083     * is off, or is being turned off.
084     * 
085     * @throws InterruptedException  if wait is interrupted
086     */
087    public void increment() throws InterruptedException {
088        synchronized (mutex) {
089            if (turningOff < 0) {
090                throw new IllegalStateException("Unbalanced turningOff: " + turningOff);
091            }
092            if (usage < 0) {
093                throw new IllegalStateException("Unbalanced usage: " + usage);
094            }
095            // Do we have to wait for the value to be on?
096            while (turningOff > 0 || !on) {
097                mutex.wait();
098            }
099            usage++;
100        }
101    }
102
103    /**
104     * Decrements the use counter of the valve.
105     */
106    public void decrement() {
107        synchronized (mutex) {
108            usage--;
109            if (turningOff < 0) {
110                throw new IllegalStateException("Unbalanced turningOff: " + turningOff);
111            }
112            if (usage < 0) {
113                throw new IllegalStateException("Unbalanced usage: " + usage);
114            }
115            if (turningOff > 0 && usage < 1) {
116                mutex.notifyAll();
117            }
118        }
119    }
120
121}