001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.util;
018
019import java.io.IOException;
020import java.sql.SQLException;
021import java.util.concurrent.TimeUnit;
022import java.util.concurrent.atomic.AtomicBoolean;
023
024import org.apache.activemq.broker.BrokerService;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027
028/**
029 * @org.apache.xbean.XBean
030 */
031 public class DefaultIOExceptionHandler implements IOExceptionHandler {
032
033    private static final Logger LOG = LoggerFactory
034            .getLogger(DefaultIOExceptionHandler.class);
035    private BrokerService broker;
036    private boolean ignoreAllErrors = false;
037    private boolean ignoreNoSpaceErrors = true;
038    private boolean ignoreSQLExceptions = true;
039    private boolean stopStartConnectors = false;
040    private String noSpaceMessage = "space";
041    private String sqlExceptionMessage = ""; // match all
042    private long resumeCheckSleepPeriod = 5*1000;
043    private AtomicBoolean stopStartInProgress = new AtomicBoolean(false);
044
045    public void handle(IOException exception) {
046        if (ignoreAllErrors) {
047            LOG.info("Ignoring IO exception, " + exception, exception);
048            return;
049        }
050
051        if (ignoreNoSpaceErrors) {
052            Throwable cause = exception;
053            while (cause != null && cause instanceof IOException) {
054                if (cause.getMessage().contains(noSpaceMessage)) {
055                    LOG.info("Ignoring no space left exception, " + exception, exception);
056                    return;
057                }
058                cause = cause.getCause();
059            }
060        }
061
062        if (ignoreSQLExceptions) {
063            Throwable cause = exception;
064            while (cause != null) {
065                if (cause instanceof SQLException && cause.getMessage().contains(sqlExceptionMessage)) {
066                    LOG.info("Ignoring SQLException, " + exception, cause);
067                    return;
068                }
069                cause = cause.getCause();
070            }
071        }
072
073        if (stopStartConnectors) {
074            if (!stopStartInProgress.compareAndSet(false, true)) {
075                // we are already working on it
076                return;
077            }
078            LOG.info("Initiating stop/restart of broker transport due to IO exception, " + exception, exception);
079
080            new Thread("stop transport connectors on IO exception") {
081                public void run() {
082                    try {
083                        ServiceStopper stopper = new ServiceStopper();
084                        broker.stopAllConnectors(stopper);
085                    } catch (Exception e) {
086                        LOG.warn("Failure occurred while stopping broker connectors", e);
087                    }
088                }
089            }.start();
090
091            // resume again
092            new Thread("restart transport connectors post IO exception") {
093                public void run() {
094                    try {
095                        while (isPersistenceAdapterDown()) {
096                            LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports");
097                            TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
098                        }
099                        broker.startAllConnectors();
100                    } catch (Exception e) {
101                        LOG.warn("Failure occurred while restarting broker connectors", e);
102                    } finally {
103                        stopStartInProgress.compareAndSet(true, false);
104                    }
105                }
106
107                private boolean isPersistenceAdapterDown() {
108                    boolean checkpointSuccess = false;
109                    try {
110                        broker.getPersistenceAdapter().checkpoint(true);
111                        checkpointSuccess = true;
112                    } catch (Throwable ignored) {}
113                    return !checkpointSuccess;
114                }
115            }.start();
116
117            return;
118        }
119
120        LOG.info("Stopping the broker due to IO exception, " + exception, exception);
121        new Thread("Stopping the broker due to IO exception") {
122            public void run() {
123                try {
124                    broker.stop();
125                } catch (Exception e) {
126                    LOG.warn("Failure occurred while stopping broker", e);
127                }
128            }
129        }.start();
130    }
131
132    public void setBrokerService(BrokerService broker) {
133        this.broker = broker;
134    }
135
136    public boolean isIgnoreAllErrors() {
137        return ignoreAllErrors;
138    }
139
140    public void setIgnoreAllErrors(boolean ignoreAllErrors) {
141        this.ignoreAllErrors = ignoreAllErrors;
142    }
143
144    public boolean isIgnoreNoSpaceErrors() {
145        return ignoreNoSpaceErrors;
146    }
147
148    public void setIgnoreNoSpaceErrors(boolean ignoreNoSpaceErrors) {
149        this.ignoreNoSpaceErrors = ignoreNoSpaceErrors;
150    }
151
152    public String getNoSpaceMessage() {
153        return noSpaceMessage;
154    }
155
156    public void setNoSpaceMessage(String noSpaceMessage) {
157        this.noSpaceMessage = noSpaceMessage;
158    }
159
160    public boolean isIgnoreSQLExceptions() {
161        return ignoreSQLExceptions;
162    }
163
164    public void setIgnoreSQLExceptions(boolean ignoreSQLExceptions) {
165        this.ignoreSQLExceptions = ignoreSQLExceptions;
166    }
167
168    public String getSqlExceptionMessage() {
169        return sqlExceptionMessage;
170    }
171
172    public void setSqlExceptionMessage(String sqlExceptionMessage) {
173        this.sqlExceptionMessage = sqlExceptionMessage;
174    }
175
176    public boolean isStopStartConnectors() {
177        return stopStartConnectors;
178    }
179
180    public void setStopStartConnectors(boolean stopStartConnectors) {
181        this.stopStartConnectors = stopStartConnectors;
182    }
183
184    public long getResumeCheckSleepPeriod() {
185        return resumeCheckSleepPeriod;
186    }
187
188    public void setResumeCheckSleepPeriod(long resumeCheckSleepPeriod) {
189        this.resumeCheckSleepPeriod = resumeCheckSleepPeriod;
190    }
191}