001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.thread;
018
019import java.util.concurrent.Executor;
020import java.util.concurrent.ExecutorService;
021import java.util.concurrent.SynchronousQueue;
022import java.util.concurrent.ThreadFactory;
023import java.util.concurrent.ThreadPoolExecutor;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.atomic.AtomicLong;
026
027/**
028 * Manages the thread pool for long running tasks. Long running tasks are not
029 * always active but when they are active, they may need a few iterations of
030 * processing for them to become idle. The manager ensures that each task is
031 * processes but that no one task overtakes the system. This is kinda like
032 * cooperative multitasking.
033 * 
034 * 
035 */
036public class TaskRunnerFactory implements Executor {
037
038    private ExecutorService executor;
039    private int maxIterationsPerRun;
040    private String name;
041    private int priority;
042    private boolean daemon;
043    private AtomicLong id = new AtomicLong(0);
044
045    public TaskRunnerFactory() {
046        this("ActiveMQ Task", Thread.NORM_PRIORITY, true, 1000);
047    }
048    
049    private TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) {
050        this(name,priority,daemon,maxIterationsPerRun,false);
051    }
052
053
054    public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner) {
055
056        this.name = name;
057        this.priority = priority;
058        this.daemon = daemon;
059        this.maxIterationsPerRun = maxIterationsPerRun;
060
061        // If your OS/JVM combination has a good thread model, you may want to
062        // avoid
063        // using a thread pool to run tasks and use a DedicatedTaskRunner
064        // instead.
065        if (dedicatedTaskRunner || "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner"))) {
066            executor = null;
067        } else {
068            executor = createDefaultExecutor();
069        }
070    }
071
072    public void shutdown() {
073        if (executor != null) {
074            executor.shutdownNow();
075        }
076    }
077
078    public TaskRunner createTaskRunner(Task task, String name) {
079        if (executor != null) {
080            return new PooledTaskRunner(executor, task, maxIterationsPerRun);
081        } else {
082            return new DedicatedTaskRunner(task, name, priority, daemon);
083        }
084    }
085
086    public void execute(Runnable runnable) {
087        execute(runnable, "ActiveMQ Task");
088    }
089    
090    public void execute(Runnable runnable, String name) {
091        if (executor != null) {
092            executor.execute(runnable);
093        } else {
094            new Thread(runnable, name + "-" + id.incrementAndGet()).start();
095        }
096    }
097
098    protected ExecutorService createDefaultExecutor() {
099        ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
100            public Thread newThread(Runnable runnable) {
101                Thread thread = new Thread(runnable, name + "-" + id.incrementAndGet());
102                thread.setDaemon(daemon);
103                thread.setPriority(priority);
104                return thread;
105            }
106        });
107        // rc.allowCoreThreadTimeOut(true);
108        return rc;
109    }
110
111}