1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.common.util.execute.impl;
17
18 import static com.google.common.base.Optional.absent;
19 import static com.google.common.base.Stopwatch.createStarted;
20 import static com.google.common.collect.Lists.newArrayList;
21 import static java.lang.Math.ceil;
22 import static java.lang.Math.max;
23 import static org.kuali.common.util.FormatUtils.getTime;
24 import static org.kuali.common.util.base.Precondition.checkNotNull;
25 import static org.kuali.common.util.log.Loggers.newLogger;
26
27 import java.lang.Thread.UncaughtExceptionHandler;
28 import java.util.List;
29
30 import org.kuali.common.util.base.Threads;
31 import org.kuali.common.util.execute.Executable;
32 import org.slf4j.Logger;
33
34 import com.google.common.base.Optional;
35 import com.google.common.base.Stopwatch;
36 import com.google.common.collect.ImmutableList;
37 import com.google.common.collect.Lists;
38
39
40
41
42 public final class ConcurrentExecutables implements Executable, UncaughtExceptionHandler {
43
44 private static final Logger logger = newLogger();
45
46 private final ImmutableList<Executable> executables;
47 private final boolean skip;
48 private final boolean timed;
49 private final Optional<Integer> maxThreads;
50
51
52 private Optional<IllegalStateException> uncaughtException = absent();
53
54 public static void execute(Executable... executables) {
55 create(executables).execute();
56 }
57
58 public static void execute(List<Executable> executables) {
59 create(executables).execute();
60 }
61
62 public static void executeConcurrently(List<Executable> executables, int maxThreads) {
63 builder(executables).withMaxThreads(maxThreads).build().execute();
64 }
65
66 public static ConcurrentExecutables create(Executable... executables) {
67 return builder(executables).build();
68 }
69
70 public static ConcurrentExecutables create(List<Executable> executables) {
71 return builder(executables).build();
72 }
73
74 public static Builder builder(Executable... executables) {
75 return new Builder(executables);
76 }
77
78 public static Builder builder(List<Executable> executables) {
79 return new Builder(executables);
80 }
81
82 public static class Builder implements org.apache.commons.lang3.builder.Builder<ConcurrentExecutables> {
83
84
85 private final List<Executable> executables;
86
87
88 private boolean skip = false;
89 private boolean timed = false;
90 private Optional<Integer> maxThreads = absent();
91
92 public Builder(Executable... executables) {
93 this(ImmutableList.copyOf(executables));
94 }
95
96 public Builder(List<Executable> executables) {
97 this.executables = ImmutableList.copyOf(executables);
98 }
99
100 public Builder timed(boolean timed) {
101 this.timed = timed;
102 return this;
103 }
104
105 public Builder skip(boolean skip) {
106 this.skip = skip;
107 return this;
108 }
109
110 public Builder withMaxThreads(int maxThreads) {
111 this.maxThreads = Optional.of(maxThreads);
112 return this;
113 }
114
115 @Override
116 public ConcurrentExecutables build() {
117 ConcurrentExecutables instance = new ConcurrentExecutables(this);
118 validate(instance);
119 return instance;
120 }
121
122 private static void validate(ConcurrentExecutables instance) {
123 checkNotNull(instance.executables, "executables");
124 checkNotNull(instance.uncaughtException, "uncaughtException");
125 }
126 }
127
128 private ConcurrentExecutables(Builder builder) {
129 this.executables = ImmutableList.copyOf(builder.executables);
130 this.skip = builder.skip;
131 this.timed = builder.timed;
132 this.maxThreads = builder.maxThreads;
133 }
134
135 @Override
136 public void execute() {
137 if (skip) {
138 logger.info("Skipping execution of {} executables", executables.size());
139 return;
140 }
141 List<Thread> threads = getThreads(executables, maxThreads);
142 Stopwatch stopwatch = createStarted();
143 Threads.start(threads);
144 Threads.join(threads);
145 if (uncaughtException.isPresent()) {
146 throw uncaughtException.get();
147 }
148 if (timed) {
149 logger.info("------------------------------------------------------------------------");
150 logger.info("Total Time: {} (Wall Clock)", getTime(stopwatch));
151 logger.info("------------------------------------------------------------------------");
152 }
153 }
154
155 protected List<Thread> getThreads(List<Executable> executables, Optional<Integer> maxThreads) {
156 int max = maxThreads.isPresent() ? maxThreads.get() : executables.size();
157 int size = (int) max(ceil(executables.size() / (max * 1D)), 1);
158 List<List<Executable>> partitions = Lists.partition(executables, size);
159 List<Thread> threads = newArrayList();
160 for (List<Executable> partition : partitions) {
161 Runnable runnable = new ExecutablesRunner(partition);
162 Thread thread = new Thread(runnable, "Executable");
163 thread.setUncaughtExceptionHandler(this);
164 threads.add(thread);
165 }
166 return threads;
167 }
168
169 @Override
170 public synchronized void uncaughtException(Thread thread, Throwable uncaughtException) {
171
172
173 if (!this.uncaughtException.isPresent()) {
174 String context = "Exception in thread [" + thread.getId() + ":" + thread.getName() + "]";
175 this.uncaughtException = Optional.of(new IllegalStateException(context, uncaughtException));
176 }
177 }
178
179 public ImmutableList<Executable> getExecutables() {
180 return executables;
181 }
182
183 public boolean isSkip() {
184 return skip;
185 }
186
187 public boolean isTimed() {
188 return timed;
189 }
190
191 public Optional<IllegalStateException> getUncaughtException() {
192 return uncaughtException;
193 }
194
195 }