View Javadoc

1   /**
2    * Copyright 2010-2012 The Kuali Foundation
3    *
4    * Licensed under the Educational Community License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.opensource.org/licenses/ecl2.php
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.kuali.maven.wagon;
17  
18  import java.io.File;
19  import java.io.FileNotFoundException;
20  import java.io.IOException;
21  import java.io.InputStream;
22  import java.io.OutputStream;
23  import java.net.URI;
24  import java.util.ArrayList;
25  import java.util.Date;
26  import java.util.List;
27  
28  import org.apache.commons.io.IOUtils;
29  import org.apache.commons.lang.StringUtils;
30  import org.apache.maven.wagon.ResourceDoesNotExistException;
31  import org.apache.maven.wagon.TransferFailedException;
32  import org.apache.maven.wagon.authentication.AuthenticationException;
33  import org.apache.maven.wagon.authentication.AuthenticationInfo;
34  import org.apache.maven.wagon.proxy.ProxyInfo;
35  import org.apache.maven.wagon.repository.Repository;
36  import org.apache.maven.wagon.repository.RepositoryPermissions;
37  import org.kuali.common.threads.ExecutionStatistics;
38  import org.kuali.common.threads.ThreadHandlerContext;
39  import org.kuali.common.threads.ThreadInvoker;
40  import org.kuali.common.threads.listener.PercentCompleteListener;
41  import org.slf4j.Logger;
42  import org.slf4j.LoggerFactory;
43  
44  import com.amazonaws.AmazonClientException;
45  import com.amazonaws.AmazonServiceException;
46  import com.amazonaws.auth.AWSCredentials;
47  import com.amazonaws.auth.BasicAWSCredentials;
48  import com.amazonaws.services.s3.AmazonS3Client;
49  import com.amazonaws.services.s3.internal.Mimetypes;
50  import com.amazonaws.services.s3.internal.RepeatableFileInputStream;
51  import com.amazonaws.services.s3.model.CannedAccessControlList;
52  import com.amazonaws.services.s3.model.ListObjectsRequest;
53  import com.amazonaws.services.s3.model.ObjectListing;
54  import com.amazonaws.services.s3.model.ObjectMetadata;
55  import com.amazonaws.services.s3.model.PutObjectRequest;
56  import com.amazonaws.services.s3.model.S3Object;
57  import com.amazonaws.services.s3.model.S3ObjectSummary;
58  import com.amazonaws.services.s3.transfer.TransferManager;
59  import com.amazonaws.services.s3.transfer.Upload;
60  
61  /**
62   * <p>
63   * An implementation of the Maven Wagon interface that is integrated with the Amazon S3 service.
64   * </p>
65   * 
66   * <p>
67   * URLs that reference the S3 service should be in the form of <code>s3://bucket.name</code>. As an example
68   * <code>s3://maven.kuali.org</code> puts files into the <code>maven.kuali.org</code> bucket on the S3 service.
69   * </p>
70   * 
71   * <p>
72   * This implementation uses the <code>username</code> and <code>password</code> portions of the server authentication metadata for
73   * credentials.
74   * </p>
75   * 
76   * @plexus.component role="org.apache.maven.wagon.Wagon" role-hint="http" instantiation-strategy="per-lookup"
77   * 
78   * @author Ben Hale
79   * @author Jeff Caddel
80   */
81  public class S3Wagon extends AbstractWagon implements RequestFactory {
82  	public static final String MIN_THREADS_KEY = "maven.wagon.threads.min";
83  	public static final String MAX_THREADS_KEY = "maven.wagon.threads.max";
84  	public static final String DIVISOR_KEY = "maven.wagon.threads.divisor";
85  	public static final int DEFAULT_MIN_THREAD_COUNT = 10;
86  	public static final int DEFAULT_MAX_THREAD_COUNT = 50;
87  	public static final int DEFAULT_DIVISOR = 50;
88  	public static final int DEFAULT_READ_TIMEOUT = 60 * 1000;
89  	public static final CannedAccessControlList DEFAULT_ACL = CannedAccessControlList.PublicRead;
90  
91  	ThreadInvoker invoker = new ThreadInvoker();
92  	SimpleFormatter formatter = new SimpleFormatter();
93  	int minThreads = getMinThreads();
94  	int maxThreads = getMaxThreads();
95  	int divisor = getDivisor();
96  	int readTimeout = DEFAULT_READ_TIMEOUT;
97  	CannedAccessControlList acl = DEFAULT_ACL;
98  	TransferManager transferManager;
99  
100 	private static final Logger log = LoggerFactory.getLogger(S3Wagon.class);
101 
102 	private AmazonS3Client client;
103 
104 	private String bucketName;
105 
106 	private String basedir;
107 
108 	private final Mimetypes mimeTypes = Mimetypes.getInstance();
109 
110 	public S3Wagon() {
111 		super(true);
112 		S3Listener listener = new S3Listener();
113 		super.addSessionListener(listener);
114 		super.addTransferListener(listener);
115 	}
116 
117 	protected void ensureBucketExists(AmazonS3Client client, String bucketName) {
118 		log.debug("Looking for bucket: " + bucketName);
119 		if (client.doesBucketExist(bucketName)) {
120 			log.debug("Found bucket " + bucketName + " Validating permissions");
121 			validatePermissions(client, bucketName);
122 		} else {
123 			log.info("Creating bucket " + bucketName);
124 			// If we create the bucket, we "own" it and by default have the "fullcontrol" permission
125 			client.createBucket(bucketName);
126 		}
127 	}
128 
129 	/**
130 	 * Establish that we have enough permissions on this bucket to do what we need to do
131 	 */
132 	protected void validatePermissions(AmazonS3Client client, String bucketName) {
133 		// This establishes our ability to list objects in this bucket
134 		ListObjectsRequest zeroObjectsRequest = new ListObjectsRequest(bucketName, null, null, null, 0);
135 		client.listObjects(zeroObjectsRequest);
136 
137 		/**
138 		 * The current AWS Java SDK does not appear to have a simple method for discovering what set of permissions the currently
139 		 * authenticated user has on a bucket. The AWS dev's suggest that you attempt to perform an operation that would fail if you don't
140 		 * have the permission in question. You would then use the success/failure of that attempt to establish what your permissions are.
141 		 * This is definitely not ideal and they are working on it, but it is not ready yet.
142 		 */
143 
144 		// Do something simple and quick to verify that we have write permissions on this bucket
145 		// One way to do this would be to create an object in this bucket, and then immediately delete it
146 		// That seems messy, inconvenient, and lame.
147 
148 	}
149 
150 	protected CannedAccessControlList getAclFromRepository(Repository repository) {
151 		RepositoryPermissions permissions = repository.getPermissions();
152 		if (permissions == null) {
153 			return null;
154 		}
155 		String filePermissions = permissions.getFileMode();
156 		if (StringUtils.isBlank(filePermissions)) {
157 			return null;
158 		}
159 		return CannedAccessControlList.valueOf(filePermissions.trim());
160 	}
161 
162 	@Override
163 	protected void connectToRepository(Repository source, AuthenticationInfo auth, ProxyInfo proxy) throws AuthenticationException {
164 
165 		AWSCredentials credentials = getCredentials(auth);
166 		this.client = new AmazonS3Client(credentials);
167 		this.transferManager = new TransferManager(credentials);
168 		this.bucketName = source.getHost();
169 		ensureBucketExists(client, bucketName);
170 		this.basedir = getBaseDir(source);
171 
172 		// If they've specified <filePermissions> in settings.xml, that always wins
173 		CannedAccessControlList repoAcl = getAclFromRepository(source);
174 		if (repoAcl != null) {
175 			log.info("File permissions: " + repoAcl.name());
176 			acl = repoAcl;
177 		}
178 	}
179 
180 	@Override
181 	protected boolean doesRemoteResourceExist(final String resourceName) {
182 		try {
183 			client.getObjectMetadata(bucketName, basedir + resourceName);
184 		} catch (AmazonClientException e) {
185 			return false;
186 		}
187 		return true;
188 	}
189 
190 	@Override
191 	protected void disconnectFromRepository() {
192 		// Nothing to do for S3
193 	}
194 
195 	/**
196 	 * Pull an object out of an S3 bucket and write it to a file
197 	 */
198 	@Override
199 	protected void getResource(final String resourceName, final File destination, final TransferProgress progress) throws ResourceDoesNotExistException, IOException {
200 		// Obtain the object from S3
201 		S3Object object = null;
202 		try {
203 			String key = basedir + resourceName;
204 			object = client.getObject(bucketName, key);
205 		} catch (Exception e) {
206 			throw new ResourceDoesNotExistException("Resource " + resourceName + " does not exist in the repository", e);
207 		}
208 
209 		//
210 		InputStream in = null;
211 		OutputStream out = null;
212 		try {
213 			in = object.getObjectContent();
214 			out = new TransferProgressFileOutputStream(destination, progress);
215 			byte[] buffer = new byte[1024];
216 			int length;
217 			while ((length = in.read(buffer)) != -1) {
218 				out.write(buffer, 0, length);
219 			}
220 		} finally {
221 			IOUtils.closeQuietly(in);
222 			IOUtils.closeQuietly(out);
223 		}
224 	}
225 
226 	/**
227 	 * Is the S3 object newer than the timestamp passed in?
228 	 */
229 	@Override
230 	protected boolean isRemoteResourceNewer(final String resourceName, final long timestamp) {
231 		ObjectMetadata metadata = client.getObjectMetadata(bucketName, basedir + resourceName);
232 		return metadata.getLastModified().compareTo(new Date(timestamp)) < 0;
233 	}
234 
235 	/**
236 	 * List all of the objects in a given directory
237 	 */
238 	@Override
239 	protected List<String> listDirectory(String directory) throws Exception {
240 		// info("directory=" + directory);
241 		if (StringUtils.isBlank(directory)) {
242 			directory = "";
243 		}
244 		String delimiter = "/";
245 		String prefix = basedir + directory;
246 		if (!prefix.endsWith(delimiter)) {
247 			prefix += delimiter;
248 		}
249 		// info("prefix=" + prefix);
250 		ListObjectsRequest request = new ListObjectsRequest();
251 		request.setBucketName(bucketName);
252 		request.setPrefix(prefix);
253 		request.setDelimiter(delimiter);
254 		ObjectListing objectListing = client.listObjects(request);
255 		// info("truncated=" + objectListing.isTruncated());
256 		// info("prefix=" + prefix);
257 		// info("basedir=" + basedir);
258 		List<String> fileNames = new ArrayList<String>();
259 		for (S3ObjectSummary summary : objectListing.getObjectSummaries()) {
260 			// info("summary.getKey()=" + summary.getKey());
261 			String key = summary.getKey();
262 			String relativeKey = key.startsWith(basedir) ? key.substring(basedir.length()) : key;
263 			boolean add = !StringUtils.isBlank(relativeKey) && !relativeKey.equals(directory);
264 			if (add) {
265 				// info("Adding key - " + relativeKey);
266 				fileNames.add(relativeKey);
267 			}
268 		}
269 		for (String commonPrefix : objectListing.getCommonPrefixes()) {
270 			String value = commonPrefix.startsWith(basedir) ? commonPrefix.substring(basedir.length()) : commonPrefix;
271 			// info("commonPrefix=" + commonPrefix);
272 			// info("relativeValue=" + relativeValue);
273 			// info("Adding common prefix - " + value);
274 			fileNames.add(value);
275 		}
276 		// StringBuilder sb = new StringBuilder();
277 		// sb.append("\n");
278 		// for (String fileName : fileNames) {
279 		// sb.append(fileName + "\n");
280 		// }
281 		// info(sb.toString());
282 		return fileNames;
283 	}
284 
285 	protected void info(String msg) {
286 		System.out.println("[INFO] " + msg);
287 	}
288 
289 	/**
290 	 * Normalize the key to our S3 object<br>
291 	 * 1. Convert "./css/style.css" into "/css/style.css"<br>
292 	 * 2. Convert "/foo/bar/../../css/style.css" into "/css/style.css"
293 	 * 
294 	 * @see java.net.URI.normalize()
295 	 */
296 	protected String getNormalizedKey(final File source, final String destination) {
297 		// Generate our bucket key for this file
298 		String key = basedir + destination;
299 		try {
300 			String prefix = "http://s3.amazonaws.com/" + bucketName + "/";
301 			String urlString = prefix + key;
302 			URI rawURI = new URI(urlString);
303 			URI normalizedURI = rawURI.normalize();
304 			String normalized = normalizedURI.toString();
305 			int pos = normalized.indexOf(prefix) + prefix.length();
306 			String normalizedKey = normalized.substring(pos);
307 			return normalizedKey;
308 		} catch (Exception e) {
309 			throw new RuntimeException(e);
310 		}
311 	}
312 
313 	protected ObjectMetadata getObjectMetadata(final File source, final String destination) {
314 		// Set the mime type according to the extension of the destination file
315 		String contentType = mimeTypes.getMimetype(destination);
316 		long contentLength = source.length();
317 
318 		ObjectMetadata omd = new ObjectMetadata();
319 		omd.setContentLength(contentLength);
320 		omd.setContentType(contentType);
321 		return omd;
322 	}
323 
324 	/**
325 	 * Create a PutObjectRequest based on the PutContext
326 	 */
327 	public PutObjectRequest getPutObjectRequest(PutFileContext context) {
328 		File source = context.getSource();
329 		String destination = context.getDestination();
330 		TransferProgress progress = context.getProgress();
331 		return getPutObjectRequest(source, destination, progress);
332 	}
333 
334 	protected InputStream getInputStream(File source, TransferProgress progress) throws FileNotFoundException {
335 		if (progress == null) {
336 			return new RepeatableFileInputStream(source);
337 		} else {
338 			return new TransferProgressFileInputStream(source, progress);
339 		}
340 	}
341 
342 	/**
343 	 * Create a PutObjectRequest based on the source file and destination passed in
344 	 */
345 	protected PutObjectRequest getPutObjectRequest(File source, String destination, TransferProgress progress) {
346 		try {
347 			String key = getNormalizedKey(source, destination);
348 			InputStream input = getInputStream(source, progress);
349 			ObjectMetadata metadata = getObjectMetadata(source, destination);
350 			PutObjectRequest request = new PutObjectRequest(bucketName, key, input, metadata);
351 			request.setCannedAcl(acl);
352 			return request;
353 		} catch (FileNotFoundException e) {
354 			throw new AmazonServiceException("File not found", e);
355 		}
356 	}
357 
358 	/**
359 	 * On S3 there are no true "directories". An S3 bucket is essentially a Hashtable of files stored by key. The integration between a
360 	 * traditional file system and an S3 bucket is to use the path of the file on the local file system as the key to the file in the
361 	 * bucket. The S3 bucket does not contain a separate key for the directory itself.
362 	 */
363 	public final void putDirectory(File sourceDir, String destinationDir) throws TransferFailedException {
364 
365 		// Examine the contents of the directory
366 		List<PutFileContext> contexts = getPutFileContexts(sourceDir, destinationDir);
367 		for (PutFileContext context : contexts) {
368 			// Progress is tracked by the thread handler when uploading files this way
369 			context.setProgress(null);
370 		}
371 
372 		// Sum the total bytes in the directory
373 		long bytes = sum(contexts);
374 
375 		// Show what we are up to
376 		log.info(getUploadStartMsg(contexts.size(), bytes));
377 
378 		// Store some context for the thread handler
379 		ThreadHandlerContext<PutFileContext> thc = new ThreadHandlerContext<PutFileContext>();
380 		thc.setList(contexts);
381 		thc.setHandler(new FileHandler());
382 		thc.setMax(maxThreads);
383 		thc.setMin(minThreads);
384 		thc.setDivisor(divisor);
385 		thc.setListener(new PercentCompleteListener<PutFileContext>());
386 
387 		// Invoke the threads
388 		ExecutionStatistics stats = invoker.invokeThreads(thc);
389 
390 		// Show some stats
391 		long millis = stats.getExecutionTime();
392 		long count = stats.getIterationCount();
393 		log.info(getUploadCompleteMsg(millis, bytes, count));
394 	}
395 
396 	protected String getUploadCompleteMsg(long millis, long bytes, long count) {
397 		String rate = formatter.getRate(millis, bytes);
398 		String time = formatter.getTime(millis);
399 		StringBuilder sb = new StringBuilder();
400 		sb.append("Files: " + count);
401 		sb.append("  Time: " + time);
402 		sb.append("  Rate: " + rate);
403 		return sb.toString();
404 	}
405 
406 	protected String getUploadStartMsg(int fileCount, long bytes) {
407 		StringBuilder sb = new StringBuilder();
408 		sb.append("Files: " + fileCount);
409 		sb.append("  Bytes: " + formatter.getSize(bytes));
410 		return sb.toString();
411 	}
412 
413 	protected int getRequestsPerThread(int threads, int requests) {
414 		int requestsPerThread = requests / threads;
415 		while (requestsPerThread * threads < requests) {
416 			requestsPerThread++;
417 		}
418 		return requestsPerThread;
419 	}
420 
421 	protected long sum(List<PutFileContext> contexts) {
422 		long sum = 0;
423 		for (PutFileContext context : contexts) {
424 			File file = context.getSource();
425 			long length = file.length();
426 			sum += length;
427 		}
428 		return sum;
429 	}
430 
431 	/**
432 	 * Store a resource into S3
433 	 */
434 	@Override
435 	protected void putResource(final File source, final String destination, final TransferProgress progress) throws IOException {
436 
437 		// Create a new S3Object
438 		PutObjectRequest request = getPutObjectRequest(source, destination, progress);
439 
440 		// Store the file on S3
441 		Upload upload = transferManager.upload(request);
442 		try {
443 			// Block and wait for the upload to finish
444 			upload.waitForCompletion();
445 		} catch (Exception e) {
446 			throw new IOException("Unexpected error uploading file", e);
447 		}
448 	}
449 
450 	protected String getDestinationPath(final String destination) {
451 		return destination.substring(0, destination.lastIndexOf('/'));
452 	}
453 
454 	/**
455 	 * Convert "/" -> ""<br>
456 	 * Convert "/snapshot/" -> "snapshot/"<br>
457 	 * Convert "/snapshot" -> "snapshot/"<br>
458 	 */
459 	protected String getBaseDir(final Repository source) {
460 		StringBuilder sb = new StringBuilder(source.getBasedir());
461 		sb.deleteCharAt(0);
462 		if (sb.length() == 0) {
463 			return "";
464 		}
465 		if (sb.charAt(sb.length() - 1) != '/') {
466 			sb.append('/');
467 		}
468 		return sb.toString();
469 	}
470 
471 	protected String getAuthenticationErrorMessage() {
472 		StringBuffer sb = new StringBuffer();
473 		sb.append("The S3 wagon needs AWS Access Key set as the username and AWS Secret Key set as the password. eg:\n");
474 		sb.append("<server>\n");
475 		sb.append("  <id>my.server</id>\n");
476 		sb.append("  <username>[AWS Access Key ID]</username>\n");
477 		sb.append("  <password>[AWS Secret Access Key]</password>\n");
478 		sb.append("</server>\n");
479 		return sb.toString();
480 	}
481 
482 	/**
483 	 * Create AWSCredentionals from the information in settings.xml
484 	 */
485 	protected AWSCredentials getCredentials(final AuthenticationInfo authenticationInfo) throws AuthenticationException {
486 		if (authenticationInfo == null) {
487 			throw new AuthenticationException(getAuthenticationErrorMessage());
488 		}
489 		String accessKey = authenticationInfo.getUserName();
490 		String secretKey = authenticationInfo.getPassword();
491 		if (accessKey == null || secretKey == null) {
492 			throw new AuthenticationException(getAuthenticationErrorMessage());
493 		}
494 		return new BasicAWSCredentials(accessKey, secretKey);
495 	}
496 
497 	/*
498 	 * (non-Javadoc)
499 	 * 
500 	 * @see org.kuali.maven.wagon.AbstractWagon#getPutFileContext(java.io.File, java.lang.String)
501 	 */
502 	@Override
503 	protected PutFileContext getPutFileContext(File source, String destination) {
504 		PutFileContext context = super.getPutFileContext(source, destination);
505 		context.setFactory(this);
506 		context.setTransferManager(this.transferManager);
507 		return context;
508 	}
509 
510 	protected int getMinThreads() {
511 		return getValue(MIN_THREADS_KEY, DEFAULT_MIN_THREAD_COUNT);
512 	}
513 
514 	protected int getMaxThreads() {
515 		return getValue(MAX_THREADS_KEY, DEFAULT_MAX_THREAD_COUNT);
516 	}
517 
518 	protected int getDivisor() {
519 		return getValue(DIVISOR_KEY, DEFAULT_DIVISOR);
520 	}
521 
522 	protected int getValue(String key, int defaultValue) {
523 		String value = System.getProperty(key);
524 		if (StringUtils.isEmpty(value)) {
525 			return defaultValue;
526 		} else {
527 			return new Integer(value);
528 		}
529 	}
530 
531 	public int getReadTimeout() {
532 		return readTimeout;
533 	}
534 
535 	public void setReadTimeout(int readTimeout) {
536 		this.readTimeout = readTimeout;
537 	}
538 
539 }