View Javadoc
1   /**
2    * Copyright 2010-2014 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.common.util.execute.impl;
17  
18  import static com.google.common.base.Optional.absent;
19  import static com.google.common.base.Stopwatch.createStarted;
20  import static com.google.common.collect.Lists.newArrayList;
21  import static java.lang.Math.ceil;
22  import static java.lang.Math.max;
23  import static org.kuali.common.util.FormatUtils.getTime;
24  import static org.kuali.common.util.base.Precondition.checkNotNull;
25  import static org.kuali.common.util.log.Loggers.newLogger;
26  
27  import java.lang.Thread.UncaughtExceptionHandler;
28  import java.util.List;
29  
30  import org.kuali.common.util.base.Threads;
31  import org.kuali.common.util.execute.Executable;
32  import org.slf4j.Logger;
33  
34  import com.google.common.base.Optional;
35  import com.google.common.base.Stopwatch;
36  import com.google.common.collect.ImmutableList;
37  import com.google.common.collect.Lists;
38  
39  /**
40   * Create a new thread for each executable in the list and run them all concurrently.
41   */
42  public final class ConcurrentExecutables implements Executable, UncaughtExceptionHandler {
43  
44  	private static final Logger logger = newLogger();
45  
46  	private final ImmutableList<Executable> executables;
47  	private final boolean skip;
48  	private final boolean timed;
49  	private final Optional<Integer> maxThreads;
50  
51  	// If any thread throws an exception, this gets filled in
52  	private Optional<IllegalStateException> uncaughtException = absent();
53  
54  	public static void execute(Executable... executables) {
55  		create(executables).execute();
56  	}
57  
58  	public static void execute(List<Executable> executables) {
59  		create(executables).execute();
60  	}
61  
62  	public static void executeConcurrently(List<Executable> executables, int maxThreads) {
63  		builder(executables).withMaxThreads(maxThreads).build().execute();
64  	}
65  
66  	public static ConcurrentExecutables create(Executable... executables) {
67  		return builder(executables).build();
68  	}
69  
70  	public static ConcurrentExecutables create(List<Executable> executables) {
71  		return builder(executables).build();
72  	}
73  
74  	public static Builder builder(Executable... executables) {
75  		return new Builder(executables);
76  	}
77  
78  	public static Builder builder(List<Executable> executables) {
79  		return new Builder(executables);
80  	}
81  
82  	public static class Builder implements org.apache.commons.lang3.builder.Builder<ConcurrentExecutables> {
83  
84  		// Required
85  		private final List<Executable> executables;
86  
87  		// Optional
88  		private boolean skip = false;
89  		private boolean timed = false;
90  		private Optional<Integer> maxThreads = absent();
91  
92  		public Builder(Executable... executables) {
93  			this(ImmutableList.copyOf(executables));
94  		}
95  
96  		public Builder(List<Executable> executables) {
97  			this.executables = ImmutableList.copyOf(executables);
98  		}
99  
100 		public Builder timed(boolean timed) {
101 			this.timed = timed;
102 			return this;
103 		}
104 
105 		public Builder skip(boolean skip) {
106 			this.skip = skip;
107 			return this;
108 		}
109 
110 		public Builder withMaxThreads(int maxThreads) {
111 			this.maxThreads = Optional.of(maxThreads);
112 			return this;
113 		}
114 
115 		@Override
116 		public ConcurrentExecutables build() {
117 			ConcurrentExecutables instance = new ConcurrentExecutables(this);
118 			validate(instance);
119 			return instance;
120 		}
121 
122 		private static void validate(ConcurrentExecutables instance) {
123 			checkNotNull(instance.executables, "executables");
124 			checkNotNull(instance.uncaughtException, "uncaughtException");
125 		}
126 	}
127 
128 	private ConcurrentExecutables(Builder builder) {
129 		this.executables = ImmutableList.copyOf(builder.executables);
130 		this.skip = builder.skip;
131 		this.timed = builder.timed;
132 		this.maxThreads = builder.maxThreads;
133 	}
134 
135 	@Override
136 	public void execute() {
137 		if (skip) {
138 			logger.info("Skipping execution of {} executables", executables.size());
139 			return;
140 		}
141 		List<Thread> threads = getThreads(executables, maxThreads);
142 		Stopwatch stopwatch = createStarted();
143 		Threads.start(threads);
144 		Threads.join(threads);
145 		if (uncaughtException.isPresent()) {
146 			throw uncaughtException.get();
147 		}
148 		if (timed) {
149 			logger.info("------------------------------------------------------------------------");
150 			logger.info("Total Time: {} (Wall Clock)", getTime(stopwatch));
151 			logger.info("------------------------------------------------------------------------");
152 		}
153 	}
154 
155 	protected List<Thread> getThreads(List<Executable> executables, Optional<Integer> maxThreads) {
156 		int max = maxThreads.isPresent() ? maxThreads.get() : executables.size();
157 		int size = (int) max(ceil(executables.size() / (max * 1D)), 1);
158 		List<List<Executable>> partitions = Lists.partition(executables, size);
159 		List<Thread> threads = newArrayList();
160 		for (List<Executable> partition : partitions) {
161 			Runnable runnable = new ExecutablesRunner(partition);
162 			Thread thread = new Thread(runnable, "Executable");
163 			thread.setUncaughtExceptionHandler(this);
164 			threads.add(thread);
165 		}
166 		return threads;
167 	}
168 
169 	@Override
170 	public synchronized void uncaughtException(Thread thread, Throwable uncaughtException) {
171 		// Only report back on the first uncaught exception reported by any thread
172 		// Any exceptions after the first one get ignored
173 		if (!this.uncaughtException.isPresent()) {
174 			String context = "Exception in thread [" + thread.getId() + ":" + thread.getName() + "]";
175 			this.uncaughtException = Optional.of(new IllegalStateException(context, uncaughtException));
176 		}
177 	}
178 
179 	public ImmutableList<Executable> getExecutables() {
180 		return executables;
181 	}
182 
183 	public boolean isSkip() {
184 		return skip;
185 	}
186 
187 	public boolean isTimed() {
188 		return timed;
189 	}
190 
191 	public Optional<IllegalStateException> getUncaughtException() {
192 		return uncaughtException;
193 	}
194 
195 }