1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.common.jdbc;
17
18 import java.io.IOException;
19 import java.sql.Connection;
20 import java.sql.DatabaseMetaData;
21 import java.sql.SQLException;
22 import java.sql.Statement;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.Collections;
26 import java.util.List;
27
28 import javax.sql.DataSource;
29
30 import org.apache.commons.lang3.StringUtils;
31 import org.kuali.common.jdbc.context.JdbcContext;
32 import org.kuali.common.jdbc.listener.BucketEvent;
33 import org.kuali.common.jdbc.listener.LogSqlListener;
34 import org.kuali.common.jdbc.listener.MultiThreadedExecutionListener;
35 import org.kuali.common.jdbc.listener.NotifyingListener;
36 import org.kuali.common.jdbc.listener.SqlEvent;
37 import org.kuali.common.jdbc.listener.SqlExecutionEvent;
38 import org.kuali.common.jdbc.listener.SqlListener;
39 import org.kuali.common.jdbc.listener.SqlMetaDataEvent;
40 import org.kuali.common.jdbc.supplier.SimpleStringSupplier;
41 import org.kuali.common.jdbc.supplier.SqlSupplier;
42 import org.kuali.common.jdbc.threads.SqlBucket;
43 import org.kuali.common.jdbc.threads.SqlBucketContext;
44 import org.kuali.common.jdbc.threads.SqlBucketHandler;
45 import org.kuali.common.threads.ExecutionStatistics;
46 import org.kuali.common.threads.ThreadHandlerContext;
47 import org.kuali.common.threads.ThreadInvoker;
48 import org.kuali.common.util.CollectionUtils;
49 import org.kuali.common.util.FormatUtils;
50 import org.kuali.common.util.PercentCompleteInformer;
51 import org.kuali.common.util.Str;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54 import org.springframework.jdbc.datasource.DataSourceUtils;
55
56 public class DefaultJdbcService implements JdbcService {
57
58 private static final Logger logger = LoggerFactory.getLogger(DefaultJdbcService.class);
59
60 @Override
61 public ExecutionResult executeSql(JdbcContext context) {
62 long updateCount = 0;
63 long start = System.currentTimeMillis();
64
65
66 if (!StringUtils.isBlank(context.getMessage())) {
67 logger.info(context.getMessage());
68 }
69
70
71 if (CollectionUtils.isEmpty(context.getSuppliers())) {
72 logger.info("Skipping execution. No suppliers");
73 return new ExecutionResult(0, start, System.currentTimeMillis());
74 }
75
76
77 if (!context.isSkipMetaData()) {
78 doMetaData(context);
79 }
80
81
82 long sqlStart = System.currentTimeMillis();
83 context.getListener().beforeExecution(new SqlExecutionEvent(context, start, -1));
84
85
86 if (context.isMultithreaded()) {
87 updateCount = executeMultiThreaded(context);
88 } else {
89 updateCount = executeSequentially(context);
90 }
91
92
93 context.getListener().afterExecution(new SqlExecutionEvent(context, sqlStart, System.currentTimeMillis()));
94
95 return new ExecutionResult(updateCount, start, System.currentTimeMillis());
96 }
97
98 protected void doMetaData(JdbcContext context) {
99
100 logger.debug("doMetaData()");
101
102
103 long start = System.currentTimeMillis();
104 context.getListener().beforeMetaData(new SqlMetaDataEvent(context, start, -1));
105
106
107 for (SqlSupplier supplier : context.getSuppliers()) {
108 supplier.fillInMetaData();
109 }
110
111
112 context.getListener().afterMetaData(new SqlMetaDataEvent(context, start, System.currentTimeMillis()));
113 }
114
115 protected long executeMultiThreaded(JdbcContext context) {
116
117
118 List<SqlBucket> buckets = getSqlBuckets(context);
119
120
121 context.getListener().bucketsCreated(new BucketEvent(context, buckets));
122
123
124 Collections.sort(buckets);
125 Collections.reverse(buckets);
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140 long total = JdbcUtils.getSqlCount(context.getSuppliers());
141 PercentCompleteInformer informer = new PercentCompleteInformer(total);
142 MultiThreadedExecutionListener etl = new MultiThreadedExecutionListener();
143 etl.setTrackProgressByUpdateCount(context.isTrackProgressByUpdateCount());
144 etl.setInformer(informer);
145 List<SqlListener> listeners = new ArrayList<SqlListener>();
146 listeners.add(new LogSqlListener());
147 listeners.add(etl);
148 NotifyingListener nl = new NotifyingListener(listeners);
149
150
151 List<SqlBucketContext> sbcs = getSqlBucketContexts(buckets, context, nl);
152
153
154 ThreadHandlerContext<SqlBucketContext> thc = new ThreadHandlerContext<SqlBucketContext>();
155 thc.setList(sbcs);
156 thc.setHandler(new SqlBucketHandler());
157 thc.setMax(buckets.size());
158 thc.setMin(buckets.size());
159 thc.setDivisor(1);
160
161
162 ThreadInvoker invoker = new ThreadInvoker();
163 ExecutionStatistics stats = invoker.invokeThreads(thc);
164 informer.stop();
165
166
167 long aggregateTime = etl.getAggregateTime();
168 long wallTime = stats.getExecutionTime();
169 String avgMillis = FormatUtils.getTime(aggregateTime / buckets.size());
170 String aTime = FormatUtils.getTime(aggregateTime);
171 String wTime = FormatUtils.getTime(wallTime);
172 String sqlCount = FormatUtils.getCount(etl.getAggregateSqlCount());
173 String sqlSize = FormatUtils.getSize(etl.getAggregateSqlSize());
174 Object[] args = { buckets.size(), wTime, aTime, avgMillis, sqlCount, sqlSize };
175 logger.info("Threads - [count: {} time: {} aggregate: {} avg: {} sql: {} - {}]", args);
176
177 return etl.getAggregateUpdateCount();
178 }
179
180 @Override
181 public ExecutionResult executeSql(DataSource dataSource, String sql) {
182 return executeSql(dataSource, Arrays.asList(sql));
183 }
184
185 @Override
186 public ExecutionResult executeSql(DataSource dataSource, List<String> sql) {
187 SqlSupplier supplier = new SimpleStringSupplier(sql);
188 JdbcContext context = new JdbcContext();
189 context.setDataSource(dataSource);
190 context.setSuppliers(Arrays.asList(supplier));
191 return executeSql(context);
192 }
193
194 protected List<SqlBucketContext> getSqlBucketContexts(List<SqlBucket> buckets, JdbcContext context, SqlListener listener) {
195 List<SqlBucketContext> sbcs = new ArrayList<SqlBucketContext>();
196
197 for (SqlBucket bucket : buckets) {
198
199 JdbcContext newJdbcContext = getJdbcContext(context, bucket, listener);
200
201 SqlBucketContext sbc = new SqlBucketContext();
202 sbc.setService(this);
203 sbc.setBucket(bucket);
204 sbc.setContext(newJdbcContext);
205 sbcs.add(sbc);
206 }
207 return sbcs;
208 }
209
210 protected JdbcContext getJdbcContext(JdbcContext original, SqlBucket bucket, SqlListener listener) {
211 JdbcContext context = new JdbcContext();
212 context.setSuppliers(bucket.getSuppliers());
213 context.setDataSource(original.getDataSource());
214 context.setCommitMode(original.getCommitMode());
215 context.setThreads(1);
216 context.setSkip(original.isSkip());
217 context.setListener(listener);
218 context.setSkipMetaData(true);
219 return context;
220 }
221
222 protected List<SqlBucket> getSqlBuckets(JdbcContext context) {
223
224
225 List<SqlSupplier> suppliers = context.getSuppliers();
226
227
228 int bucketCount = Math.min(context.getThreads(), suppliers.size());
229
230
231 Collections.sort(suppliers);
232
233
234 Collections.reverse(suppliers);
235
236
237 List<SqlBucket> buckets = CollectionUtils.getNewList(SqlBucket.class, bucketCount);
238
239
240
241 for (SqlSupplier supplier : suppliers) {
242
243
244 Collections.sort(buckets);
245
246
247 SqlBucket smallest = buckets.get(0);
248
249
250 smallest.getSuppliers().add(supplier);
251
252
253 smallest.setCount(smallest.getCount() + supplier.getMetaData().getCount());
254 smallest.setSize(smallest.getSize() + supplier.getMetaData().getSize());
255 }
256
257
258 return buckets;
259 }
260
261 protected long executeSequentially(JdbcContext context) {
262 Connection conn = null;
263 Statement statement = null;
264 try {
265 long updateCount = 0;
266 conn = DataSourceUtils.doGetConnection(context.getDataSource());
267 boolean originalAutoCommitSetting = conn.getAutoCommit();
268 conn.setAutoCommit(false);
269 statement = conn.createStatement();
270 List<SqlSupplier> suppliers = context.getSuppliers();
271 for (SqlSupplier supplier : suppliers) {
272 updateCount += excecuteSupplier(statement, context, supplier);
273 conn.commit();
274 }
275 conn.setAutoCommit(originalAutoCommitSetting);
276 return updateCount;
277 } catch (Exception e) {
278 throw new IllegalStateException(e);
279 } finally {
280 JdbcUtils.closeQuietly(context.getDataSource(), conn, statement);
281 }
282 }
283
284 protected long excecuteSupplier(Statement statement, JdbcContext context, SqlSupplier supplier) throws SQLException {
285 try {
286 long updateCount = 0;
287 supplier.open();
288 List<String> sql = supplier.getSql();
289 while (sql != null) {
290 for (String s : sql) {
291 updateCount += executeSql(statement, s, context);
292 }
293 sql = supplier.getSql();
294 }
295 return updateCount;
296 } catch (IOException e) {
297 throw new IllegalStateException(e);
298 } finally {
299 supplier.close();
300 }
301 }
302
303 protected int executeSql(Statement statement, String sql, JdbcContext context) throws SQLException {
304 try {
305 int updateCount = 0;
306 long start = System.currentTimeMillis();
307 context.getListener().beforeExecuteSql(new SqlEvent(sql, start));
308 if (!context.isSkip()) {
309
310
311 statement.execute(sql);
312
313
314 updateCount = statement.getUpdateCount();
315
316
317 updateCount = (updateCount == -1) ? 0 : updateCount;
318 }
319 context.getListener().afterExecuteSql(new SqlEvent(sql, updateCount, start, System.currentTimeMillis()));
320 return updateCount;
321 } catch (SQLException e) {
322 throw new SQLException("Error executing SQL [" + Str.flatten(sql) + "]", e);
323 }
324 }
325
326 @Override
327 public JdbcMetaData getJdbcMetaData(DataSource dataSource) {
328 Connection conn = null;
329 try {
330 conn = DataSourceUtils.doGetConnection(dataSource);
331 DatabaseMetaData dbmd = conn.getMetaData();
332 return getJdbcMetaData(dbmd);
333 } catch (Exception e) {
334 throw new IllegalStateException(e);
335 } finally {
336 logger.trace("closing connection");
337 JdbcUtils.closeQuietly(dataSource, conn);
338 }
339 }
340
341 protected JdbcMetaData getJdbcMetaData(DatabaseMetaData dbmd) throws SQLException {
342 JdbcMetaData md = new JdbcMetaData();
343 md.setDatabaseProductName(dbmd.getDatabaseProductName());
344 md.setDatabaseProductVersion(dbmd.getDatabaseProductVersion());
345 md.setDriverName(dbmd.getDriverName());
346 md.setDriverVersion(dbmd.getDriverVersion());
347 md.setUrl(dbmd.getURL());
348 md.setUsername(dbmd.getUserName());
349 return md;
350 }
351 }