View Javadoc

1   /**
2    * Copyright 2010-2013 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.common.jdbc;
17  
18  import java.io.IOException;
19  import java.sql.Connection;
20  import java.sql.DatabaseMetaData;
21  import java.sql.SQLException;
22  import java.sql.Statement;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.Collections;
26  import java.util.List;
27  
28  import javax.sql.DataSource;
29  
30  import org.apache.commons.lang3.StringUtils;
31  import org.kuali.common.jdbc.context.JdbcContext;
32  import org.kuali.common.jdbc.listener.BucketEvent;
33  import org.kuali.common.jdbc.listener.LogSqlListener;
34  import org.kuali.common.jdbc.listener.MultiThreadedExecutionListener;
35  import org.kuali.common.jdbc.listener.NotifyingListener;
36  import org.kuali.common.jdbc.listener.SqlEvent;
37  import org.kuali.common.jdbc.listener.SqlExecutionEvent;
38  import org.kuali.common.jdbc.listener.SqlListener;
39  import org.kuali.common.jdbc.listener.SqlMetaDataEvent;
40  import org.kuali.common.jdbc.supplier.SimpleStringSupplier;
41  import org.kuali.common.jdbc.supplier.SqlSupplier;
42  import org.kuali.common.jdbc.threads.SqlBucket;
43  import org.kuali.common.jdbc.threads.SqlBucketContext;
44  import org.kuali.common.jdbc.threads.SqlBucketHandler;
45  import org.kuali.common.threads.ExecutionStatistics;
46  import org.kuali.common.threads.ThreadHandlerContext;
47  import org.kuali.common.threads.ThreadInvoker;
48  import org.kuali.common.util.CollectionUtils;
49  import org.kuali.common.util.FormatUtils;
50  import org.kuali.common.util.PercentCompleteInformer;
51  import org.kuali.common.util.Str;
52  import org.slf4j.Logger;
53  import org.slf4j.LoggerFactory;
54  import org.springframework.jdbc.datasource.DataSourceUtils;
55  
56  public class DefaultJdbcService implements JdbcService {
57  
58  	private static final Logger logger = LoggerFactory.getLogger(DefaultJdbcService.class);
59  
60  	@Override
61  	public ExecutionResult executeSql(JdbcContext context) {
62  		long updateCount = 0;
63  		long start = System.currentTimeMillis();
64  
65  		// Log a message if provided
66  		if (!StringUtils.isBlank(context.getMessage())) {
67  			logger.info(context.getMessage());
68  		}
69  
70  		// Make sure we have something to do
71  		if (CollectionUtils.isEmpty(context.getSuppliers())) {
72  			logger.info("Skipping execution.  No suppliers");
73  			return new ExecutionResult(0, start, System.currentTimeMillis());
74  		}
75  
76  		// Calculate metadata
77  		if (!context.isSkipMetaData()) {
78  			doMetaData(context);
79  		}
80  
81  		// Fire an event before executing any SQL
82  		long sqlStart = System.currentTimeMillis();
83  		context.getListener().beforeExecution(new SqlExecutionEvent(context, start, -1));
84  
85  		// Execute the SQL as dictated by the context
86  		if (context.isMultithreaded()) {
87  			updateCount = executeMultiThreaded(context);
88  		} else {
89  			updateCount = executeSequentially(context);
90  		}
91  
92  		// Fire an event now that all SQL execution is complete
93  		context.getListener().afterExecution(new SqlExecutionEvent(context, sqlStart, System.currentTimeMillis()));
94  
95  		return new ExecutionResult(updateCount, start, System.currentTimeMillis());
96  	}
97  
98  	protected void doMetaData(JdbcContext context) {
99  
100 		logger.debug("doMetaData()");
101 
102 		// Fire an event before we begin calculating metadata
103 		long start = System.currentTimeMillis();
104 		context.getListener().beforeMetaData(new SqlMetaDataEvent(context, start, -1));
105 
106 		// Fill in SQL metadata
107 		for (SqlSupplier supplier : context.getSuppliers()) {
108 			supplier.fillInMetaData();
109 		}
110 
111 		// Fire an event now that metadata calculation is complete
112 		context.getListener().afterMetaData(new SqlMetaDataEvent(context, start, System.currentTimeMillis()));
113 	}
114 
115 	protected long executeMultiThreaded(JdbcContext context) {
116 
117 		// Divide the SQL we have to execute up into buckets as "evenly" as possible
118 		List<SqlBucket> buckets = getSqlBuckets(context);
119 
120 		// Notify the listener now that buckets are created
121 		context.getListener().bucketsCreated(new BucketEvent(context, buckets));
122 
123 		// Sort the buckets largest to smallest
124 		Collections.sort(buckets);
125 		Collections.reverse(buckets);
126 
127 		// The tracking built into the kuali-threads package assumes "progress" equals one element from the list completing
128 		// It assumes you have a gigantic list where each element in the list = 1 unit of work
129 		// A large list of files that need to be posted to S3 (for example).
130 		// If we could randomly split up the SQL and execute it in whatever order we wanted, the built in tracking would work.
131 		// We cannot do that though, since the SQL in each file needs to execute sequentially in order
132 		// SQL from different files can execute concurrently, but the SQL inside each file needs to execute in order
133 		// For example OLE has ~250,000 SQL statements split up across ~300 files
134 		// In addition, the schema related DDL files need to execute first, then data, then constraints DDL files
135 		// Some files are HUGE, some are tiny. Printing a dot after each file completes doesn't make sense.
136 		// Our list of buckets is pretty small, even though the total number of SQL statements is quite large
137 		// Only printing a dot to the console when each bucket completes is not granular enough
138 
139 		// This listener prints a dot each time 1% of the total number of SQL statements across all of the buckets has been executed.
140 		long total = JdbcUtils.getSqlCount(context.getSuppliers());
141 		PercentCompleteInformer informer = new PercentCompleteInformer(total);
142 		MultiThreadedExecutionListener etl = new MultiThreadedExecutionListener();
143 		etl.setTrackProgressByUpdateCount(context.isTrackProgressByUpdateCount());
144 		etl.setInformer(informer);
145 		List<SqlListener> listeners = new ArrayList<SqlListener>();
146 		listeners.add(new LogSqlListener());
147 		listeners.add(etl);
148 		NotifyingListener nl = new NotifyingListener(listeners);
149 
150 		// Provide some context for each bucket
151 		List<SqlBucketContext> sbcs = getSqlBucketContexts(buckets, context, nl);
152 
153 		// Store some context for the thread handler
154 		ThreadHandlerContext<SqlBucketContext> thc = new ThreadHandlerContext<SqlBucketContext>();
155 		thc.setList(sbcs);
156 		thc.setHandler(new SqlBucketHandler());
157 		thc.setMax(buckets.size());
158 		thc.setMin(buckets.size());
159 		thc.setDivisor(1);
160 
161 		// Start threads to execute SQL from multiple suppliers concurrently
162 		ThreadInvoker invoker = new ThreadInvoker();
163 		ExecutionStatistics stats = invoker.invokeThreads(thc);
164 		informer.stop();
165 
166 		// Display thread related stats
167 		long aggregateTime = etl.getAggregateTime();
168 		long wallTime = stats.getExecutionTime();
169 		String avgMillis = FormatUtils.getTime(aggregateTime / buckets.size());
170 		String aTime = FormatUtils.getTime(aggregateTime);
171 		String wTime = FormatUtils.getTime(wallTime);
172 		String sqlCount = FormatUtils.getCount(etl.getAggregateSqlCount());
173 		String sqlSize = FormatUtils.getSize(etl.getAggregateSqlSize());
174 		Object[] args = { buckets.size(), wTime, aTime, avgMillis, sqlCount, sqlSize };
175 		logger.info("Threads - [count: {}  time: {}  aggregate: {}  avg: {}  sql: {} - {}]", args);
176 
177 		return etl.getAggregateUpdateCount();
178 	}
179 
180 	@Override
181 	public ExecutionResult executeSql(DataSource dataSource, String sql) {
182 		return executeSql(dataSource, Arrays.asList(sql));
183 	}
184 
185 	@Override
186 	public ExecutionResult executeSql(DataSource dataSource, List<String> sql) {
187 		SqlSupplier supplier = new SimpleStringSupplier(sql);
188 		JdbcContext context = new JdbcContext();
189 		context.setDataSource(dataSource);
190 		context.setSuppliers(Arrays.asList(supplier));
191 		return executeSql(context);
192 	}
193 
194 	protected List<SqlBucketContext> getSqlBucketContexts(List<SqlBucket> buckets, JdbcContext context, SqlListener listener) {
195 		List<SqlBucketContext> sbcs = new ArrayList<SqlBucketContext>();
196 
197 		for (SqlBucket bucket : buckets) {
198 
199 			JdbcContext newJdbcContext = getJdbcContext(context, bucket, listener);
200 
201 			SqlBucketContext sbc = new SqlBucketContext();
202 			sbc.setService(this);
203 			sbc.setBucket(bucket);
204 			sbc.setContext(newJdbcContext);
205 			sbcs.add(sbc);
206 		}
207 		return sbcs;
208 	}
209 
210 	protected JdbcContext getJdbcContext(JdbcContext original, SqlBucket bucket, SqlListener listener) {
211 		JdbcContext context = new JdbcContext();
212 		context.setSuppliers(bucket.getSuppliers());
213 		context.setDataSource(original.getDataSource());
214 		context.setCommitMode(original.getCommitMode());
215 		context.setThreads(1);
216 		context.setSkip(original.isSkip());
217 		context.setListener(listener);
218 		context.setSkipMetaData(true);
219 		return context;
220 	}
221 
222 	protected List<SqlBucket> getSqlBuckets(JdbcContext context) {
223 
224 		// Pull out our list of suppliers
225 		List<SqlSupplier> suppliers = context.getSuppliers();
226 
227 		// number of buckets equals thread count, unless thread count > total number of sources
228 		int bucketCount = Math.min(context.getThreads(), suppliers.size());
229 
230 		// Sort the suppliers by SQL size
231 		Collections.sort(suppliers);
232 
233 		// Largest to smallest instead of smallest to largest
234 		Collections.reverse(suppliers);
235 
236 		// Allocate some buckets to hold the sql
237 		List<SqlBucket> buckets = CollectionUtils.getNewList(SqlBucket.class, bucketCount);
238 
239 		// Distribute the sources into buckets as evenly as possible
240 		// "Evenly" in this case means each bucket should be roughly the same size
241 		for (SqlSupplier supplier : suppliers) {
242 
243 			// Sort the buckets by size
244 			Collections.sort(buckets);
245 
246 			// First bucket in the list is the smallest
247 			SqlBucket smallest = buckets.get(0);
248 
249 			// Add this source to the bucket
250 			smallest.getSuppliers().add(supplier);
251 
252 			// Update the bucket metadata holding overall size
253 			smallest.setCount(smallest.getCount() + supplier.getMetaData().getCount());
254 			smallest.setSize(smallest.getSize() + supplier.getMetaData().getSize());
255 		}
256 
257 		// Return the buckets
258 		return buckets;
259 	}
260 
261 	protected long executeSequentially(JdbcContext context) {
262 		Connection conn = null;
263 		Statement statement = null;
264 		try {
265 			long updateCount = 0;
266 			conn = DataSourceUtils.doGetConnection(context.getDataSource());
267 			boolean originalAutoCommitSetting = conn.getAutoCommit();
268 			conn.setAutoCommit(false);
269 			statement = conn.createStatement();
270 			List<SqlSupplier> suppliers = context.getSuppliers();
271 			for (SqlSupplier supplier : suppliers) {
272 				updateCount += excecuteSupplier(statement, context, supplier);
273 				conn.commit();
274 			}
275 			conn.setAutoCommit(originalAutoCommitSetting);
276 			return updateCount;
277 		} catch (Exception e) {
278 			throw new IllegalStateException(e);
279 		} finally {
280 			JdbcUtils.closeQuietly(context.getDataSource(), conn, statement);
281 		}
282 	}
283 
284 	protected long excecuteSupplier(Statement statement, JdbcContext context, SqlSupplier supplier) throws SQLException {
285 		try {
286 			long updateCount = 0;
287 			supplier.open();
288 			List<String> sql = supplier.getSql();
289 			while (sql != null) {
290 				for (String s : sql) {
291 					updateCount += executeSql(statement, s, context);
292 				}
293 				sql = supplier.getSql();
294 			}
295 			return updateCount;
296 		} catch (IOException e) {
297 			throw new IllegalStateException(e);
298 		} finally {
299 			supplier.close();
300 		}
301 	}
302 
303 	protected int executeSql(Statement statement, String sql, JdbcContext context) throws SQLException {
304 		try {
305 			int updateCount = 0;
306 			long start = System.currentTimeMillis();
307 			context.getListener().beforeExecuteSql(new SqlEvent(sql, start));
308 			if (!context.isSkip()) {
309 
310 				// Execute the SQL
311 				statement.execute(sql);
312 
313 				// Get the number of rows updated as a result of executing this SQL
314 				updateCount = statement.getUpdateCount();
315 
316 				// Some SQL statements have nothing to do with updating rows
317 				updateCount = (updateCount == -1) ? 0 : updateCount;
318 			}
319 			context.getListener().afterExecuteSql(new SqlEvent(sql, updateCount, start, System.currentTimeMillis()));
320 			return updateCount;
321 		} catch (SQLException e) {
322 			throw new SQLException("Error executing SQL [" + Str.flatten(sql) + "]", e);
323 		}
324 	}
325 
326 	@Override
327 	public JdbcMetaData getJdbcMetaData(DataSource dataSource) {
328 		Connection conn = null;
329 		try {
330 			conn = DataSourceUtils.doGetConnection(dataSource);
331 			DatabaseMetaData dbmd = conn.getMetaData();
332 			return getJdbcMetaData(dbmd);
333 		} catch (Exception e) {
334 			throw new IllegalStateException(e);
335 		} finally {
336 			logger.trace("closing connection");
337 			JdbcUtils.closeQuietly(dataSource, conn);
338 		}
339 	}
340 
341 	protected JdbcMetaData getJdbcMetaData(DatabaseMetaData dbmd) throws SQLException {
342 		JdbcMetaData md = new JdbcMetaData();
343 		md.setDatabaseProductName(dbmd.getDatabaseProductName());
344 		md.setDatabaseProductVersion(dbmd.getDatabaseProductVersion());
345 		md.setDriverName(dbmd.getDriverName());
346 		md.setDriverVersion(dbmd.getDriverVersion());
347 		md.setUrl(dbmd.getURL());
348 		md.setUsername(dbmd.getUserName());
349 		return md;
350 	}
351 }