1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.maven.wagon;
17
18 import java.io.File;
19 import java.io.FileNotFoundException;
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.io.OutputStream;
23 import java.net.URI;
24 import java.util.ArrayList;
25 import java.util.Date;
26 import java.util.List;
27
28 import org.apache.commons.io.IOUtils;
29 import org.apache.commons.lang.StringUtils;
30 import org.apache.maven.wagon.ResourceDoesNotExistException;
31 import org.apache.maven.wagon.TransferFailedException;
32 import org.apache.maven.wagon.authentication.AuthenticationException;
33 import org.apache.maven.wagon.authentication.AuthenticationInfo;
34 import org.apache.maven.wagon.proxy.ProxyInfo;
35 import org.apache.maven.wagon.repository.Repository;
36 import org.apache.maven.wagon.repository.RepositoryPermissions;
37 import org.kuali.common.threads.ExecutionStatistics;
38 import org.kuali.common.threads.ThreadHandlerContext;
39 import org.kuali.common.threads.ThreadInvoker;
40 import org.kuali.common.threads.listener.PercentCompleteListener;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 import com.amazonaws.AmazonClientException;
45 import com.amazonaws.AmazonServiceException;
46 import com.amazonaws.auth.AWSCredentials;
47 import com.amazonaws.auth.BasicAWSCredentials;
48 import com.amazonaws.services.s3.AmazonS3Client;
49 import com.amazonaws.services.s3.internal.Mimetypes;
50 import com.amazonaws.services.s3.internal.RepeatableFileInputStream;
51 import com.amazonaws.services.s3.model.CannedAccessControlList;
52 import com.amazonaws.services.s3.model.ListObjectsRequest;
53 import com.amazonaws.services.s3.model.ObjectListing;
54 import com.amazonaws.services.s3.model.ObjectMetadata;
55 import com.amazonaws.services.s3.model.PutObjectRequest;
56 import com.amazonaws.services.s3.model.S3Object;
57 import com.amazonaws.services.s3.model.S3ObjectSummary;
58 import com.amazonaws.services.s3.transfer.TransferManager;
59 import com.amazonaws.services.s3.transfer.Upload;
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81 public class S3Wagon extends AbstractWagon implements RequestFactory {
82 public static final String MIN_THREADS_KEY = "maven.wagon.threads.min";
83 public static final String MAX_THREADS_KEY = "maven.wagon.threads.max";
84 public static final String DIVISOR_KEY = "maven.wagon.threads.divisor";
85 public static final int DEFAULT_MIN_THREAD_COUNT = 10;
86 public static final int DEFAULT_MAX_THREAD_COUNT = 50;
87 public static final int DEFAULT_DIVISOR = 50;
88 public static final int DEFAULT_READ_TIMEOUT = 60 * 1000;
89 public static final CannedAccessControlList DEFAULT_ACL = CannedAccessControlList.PublicRead;
90
91 ThreadInvoker invoker = new ThreadInvoker();
92 SimpleFormatter formatter = new SimpleFormatter();
93 int minThreads = getMinThreads();
94 int maxThreads = getMaxThreads();
95 int divisor = getDivisor();
96 int readTimeout = DEFAULT_READ_TIMEOUT;
97 CannedAccessControlList acl = DEFAULT_ACL;
98 TransferManager transferManager;
99
100 private static final Logger log = LoggerFactory.getLogger(S3Wagon.class);
101
102 private AmazonS3Client client;
103
104 private String bucketName;
105
106 private String basedir;
107
108 private final Mimetypes mimeTypes = Mimetypes.getInstance();
109
110 public S3Wagon() {
111 super(true);
112 S3Listener listener = new S3Listener();
113 super.addSessionListener(listener);
114 super.addTransferListener(listener);
115 }
116
117 protected void ensureBucketExists(AmazonS3Client client, String bucketName) {
118 log.debug("Looking for bucket: " + bucketName);
119 if (client.doesBucketExist(bucketName)) {
120 log.debug("Found bucket " + bucketName + " Validating permissions");
121 validatePermissions(client, bucketName);
122 } else {
123 log.info("Creating bucket " + bucketName);
124
125 client.createBucket(bucketName);
126 }
127 }
128
129
130
131
132 protected void validatePermissions(AmazonS3Client client, String bucketName) {
133
134 ListObjectsRequest zeroObjectsRequest = new ListObjectsRequest(bucketName, null, null, null, 0);
135 client.listObjects(zeroObjectsRequest);
136
137
138
139
140
141
142
143
144
145
146
147
148 }
149
150 protected CannedAccessControlList getAclFromRepository(Repository repository) {
151 RepositoryPermissions permissions = repository.getPermissions();
152 if (permissions == null) {
153 return null;
154 }
155 String filePermissions = permissions.getFileMode();
156 if (StringUtils.isBlank(filePermissions)) {
157 return null;
158 }
159 return CannedAccessControlList.valueOf(filePermissions.trim());
160 }
161
162 @Override
163 protected void connectToRepository(Repository source, AuthenticationInfo auth, ProxyInfo proxy) throws AuthenticationException {
164
165 AWSCredentials credentials = getCredentials(auth);
166 this.client = new AmazonS3Client(credentials);
167 this.transferManager = new TransferManager(credentials);
168 this.bucketName = source.getHost();
169 ensureBucketExists(client, bucketName);
170 this.basedir = getBaseDir(source);
171
172
173 CannedAccessControlList repoAcl = getAclFromRepository(source);
174 if (repoAcl != null) {
175 log.info("File permissions: " + repoAcl.name());
176 acl = repoAcl;
177 }
178 }
179
180 @Override
181 protected boolean doesRemoteResourceExist(final String resourceName) {
182 try {
183 client.getObjectMetadata(bucketName, basedir + resourceName);
184 } catch (AmazonClientException e) {
185 return false;
186 }
187 return true;
188 }
189
190 @Override
191 protected void disconnectFromRepository() {
192
193 }
194
195
196
197
198 @Override
199 protected void getResource(final String resourceName, final File destination, final TransferProgress progress) throws ResourceDoesNotExistException, IOException {
200
201 S3Object object = null;
202 try {
203 String key = basedir + resourceName;
204 object = client.getObject(bucketName, key);
205 } catch (Exception e) {
206 throw new ResourceDoesNotExistException("Resource " + resourceName + " does not exist in the repository", e);
207 }
208
209
210 InputStream in = null;
211 OutputStream out = null;
212 try {
213 in = object.getObjectContent();
214 out = new TransferProgressFileOutputStream(destination, progress);
215 byte[] buffer = new byte[1024];
216 int length;
217 while ((length = in.read(buffer)) != -1) {
218 out.write(buffer, 0, length);
219 }
220 } finally {
221 IOUtils.closeQuietly(in);
222 IOUtils.closeQuietly(out);
223 }
224 }
225
226
227
228
229 @Override
230 protected boolean isRemoteResourceNewer(final String resourceName, final long timestamp) {
231 ObjectMetadata metadata = client.getObjectMetadata(bucketName, basedir + resourceName);
232 return metadata.getLastModified().compareTo(new Date(timestamp)) < 0;
233 }
234
235
236
237
238 @Override
239 protected List<String> listDirectory(String directory) throws Exception {
240
241 if (StringUtils.isBlank(directory)) {
242 directory = "";
243 }
244 String delimiter = "/";
245 String prefix = basedir + directory;
246 if (!prefix.endsWith(delimiter)) {
247 prefix += delimiter;
248 }
249
250 ListObjectsRequest request = new ListObjectsRequest();
251 request.setBucketName(bucketName);
252 request.setPrefix(prefix);
253 request.setDelimiter(delimiter);
254 ObjectListing objectListing = client.listObjects(request);
255
256
257
258 List<String> fileNames = new ArrayList<String>();
259 for (S3ObjectSummary summary : objectListing.getObjectSummaries()) {
260
261 String key = summary.getKey();
262 String relativeKey = key.startsWith(basedir) ? key.substring(basedir.length()) : key;
263 boolean add = !StringUtils.isBlank(relativeKey) && !relativeKey.equals(directory);
264 if (add) {
265
266 fileNames.add(relativeKey);
267 }
268 }
269 for (String commonPrefix : objectListing.getCommonPrefixes()) {
270 String value = commonPrefix.startsWith(basedir) ? commonPrefix.substring(basedir.length()) : commonPrefix;
271
272
273
274 fileNames.add(value);
275 }
276
277
278
279
280
281
282 return fileNames;
283 }
284
285 protected void info(String msg) {
286 System.out.println("[INFO] " + msg);
287 }
288
289
290
291
292
293
294
295
296 protected String getNormalizedKey(final File source, final String destination) {
297
298 String key = basedir + destination;
299 try {
300 String prefix = "http://s3.amazonaws.com/" + bucketName + "/";
301 String urlString = prefix + key;
302 URI rawURI = new URI(urlString);
303 URI normalizedURI = rawURI.normalize();
304 String normalized = normalizedURI.toString();
305 int pos = normalized.indexOf(prefix) + prefix.length();
306 String normalizedKey = normalized.substring(pos);
307 return normalizedKey;
308 } catch (Exception e) {
309 throw new RuntimeException(e);
310 }
311 }
312
313 protected ObjectMetadata getObjectMetadata(final File source, final String destination) {
314
315 String contentType = mimeTypes.getMimetype(destination);
316 long contentLength = source.length();
317
318 ObjectMetadata omd = new ObjectMetadata();
319 omd.setContentLength(contentLength);
320 omd.setContentType(contentType);
321 return omd;
322 }
323
324
325
326
327 public PutObjectRequest getPutObjectRequest(PutFileContext context) {
328 File source = context.getSource();
329 String destination = context.getDestination();
330 TransferProgress progress = context.getProgress();
331 return getPutObjectRequest(source, destination, progress);
332 }
333
334 protected InputStream getInputStream(File source, TransferProgress progress) throws FileNotFoundException {
335 if (progress == null) {
336 return new RepeatableFileInputStream(source);
337 } else {
338 return new TransferProgressFileInputStream(source, progress);
339 }
340 }
341
342
343
344
345 protected PutObjectRequest getPutObjectRequest(File source, String destination, TransferProgress progress) {
346 try {
347 String key = getNormalizedKey(source, destination);
348 InputStream input = getInputStream(source, progress);
349 ObjectMetadata metadata = getObjectMetadata(source, destination);
350 PutObjectRequest request = new PutObjectRequest(bucketName, key, input, metadata);
351 request.setCannedAcl(acl);
352 return request;
353 } catch (FileNotFoundException e) {
354 throw new AmazonServiceException("File not found", e);
355 }
356 }
357
358
359
360
361
362
363 public final void putDirectory(File sourceDir, String destinationDir) throws TransferFailedException {
364
365
366 List<PutFileContext> contexts = getPutFileContexts(sourceDir, destinationDir);
367 for (PutFileContext context : contexts) {
368
369 context.setProgress(null);
370 }
371
372
373 long bytes = sum(contexts);
374
375
376 log.info(getUploadStartMsg(contexts.size(), bytes));
377
378
379 ThreadHandlerContext<PutFileContext> thc = new ThreadHandlerContext<PutFileContext>();
380 thc.setList(contexts);
381 thc.setHandler(new FileHandler());
382 thc.setMax(maxThreads);
383 thc.setMin(minThreads);
384 thc.setDivisor(divisor);
385 thc.setListener(new PercentCompleteListener<PutFileContext>());
386
387
388 ExecutionStatistics stats = invoker.invokeThreads(thc);
389
390
391 long millis = stats.getExecutionTime();
392 long count = stats.getIterationCount();
393 log.info(getUploadCompleteMsg(millis, bytes, count));
394 }
395
396 protected String getUploadCompleteMsg(long millis, long bytes, long count) {
397 String rate = formatter.getRate(millis, bytes);
398 String time = formatter.getTime(millis);
399 StringBuilder sb = new StringBuilder();
400 sb.append("Files: " + count);
401 sb.append(" Time: " + time);
402 sb.append(" Rate: " + rate);
403 return sb.toString();
404 }
405
406 protected String getUploadStartMsg(int fileCount, long bytes) {
407 StringBuilder sb = new StringBuilder();
408 sb.append("Files: " + fileCount);
409 sb.append(" Bytes: " + formatter.getSize(bytes));
410 return sb.toString();
411 }
412
413 protected int getRequestsPerThread(int threads, int requests) {
414 int requestsPerThread = requests / threads;
415 while (requestsPerThread * threads < requests) {
416 requestsPerThread++;
417 }
418 return requestsPerThread;
419 }
420
421 protected long sum(List<PutFileContext> contexts) {
422 long sum = 0;
423 for (PutFileContext context : contexts) {
424 File file = context.getSource();
425 long length = file.length();
426 sum += length;
427 }
428 return sum;
429 }
430
431
432
433
434 @Override
435 protected void putResource(final File source, final String destination, final TransferProgress progress) throws IOException {
436
437
438 PutObjectRequest request = getPutObjectRequest(source, destination, progress);
439
440
441 Upload upload = transferManager.upload(request);
442 try {
443
444 upload.waitForCompletion();
445 } catch (Exception e) {
446 throw new IOException("Unexpected error uploading file", e);
447 }
448 }
449
450 protected String getDestinationPath(final String destination) {
451 return destination.substring(0, destination.lastIndexOf('/'));
452 }
453
454
455
456
457
458
459 protected String getBaseDir(final Repository source) {
460 StringBuilder sb = new StringBuilder(source.getBasedir());
461 sb.deleteCharAt(0);
462 if (sb.length() == 0) {
463 return "";
464 }
465 if (sb.charAt(sb.length() - 1) != '/') {
466 sb.append('/');
467 }
468 return sb.toString();
469 }
470
471 protected String getAuthenticationErrorMessage() {
472 StringBuffer sb = new StringBuffer();
473 sb.append("The S3 wagon needs AWS Access Key set as the username and AWS Secret Key set as the password. eg:\n");
474 sb.append("<server>\n");
475 sb.append(" <id>my.server</id>\n");
476 sb.append(" <username>[AWS Access Key ID]</username>\n");
477 sb.append(" <password>[AWS Secret Access Key]</password>\n");
478 sb.append("</server>\n");
479 return sb.toString();
480 }
481
482
483
484
485 protected AWSCredentials getCredentials(final AuthenticationInfo authenticationInfo) throws AuthenticationException {
486 if (authenticationInfo == null) {
487 throw new AuthenticationException(getAuthenticationErrorMessage());
488 }
489 String accessKey = authenticationInfo.getUserName();
490 String secretKey = authenticationInfo.getPassword();
491 if (accessKey == null || secretKey == null) {
492 throw new AuthenticationException(getAuthenticationErrorMessage());
493 }
494 return new BasicAWSCredentials(accessKey, secretKey);
495 }
496
497
498
499
500
501
502 @Override
503 protected PutFileContext getPutFileContext(File source, String destination) {
504 PutFileContext context = super.getPutFileContext(source, destination);
505 context.setFactory(this);
506 context.setTransferManager(this.transferManager);
507 return context;
508 }
509
510 protected int getMinThreads() {
511 return getValue(MIN_THREADS_KEY, DEFAULT_MIN_THREAD_COUNT);
512 }
513
514 protected int getMaxThreads() {
515 return getValue(MAX_THREADS_KEY, DEFAULT_MAX_THREAD_COUNT);
516 }
517
518 protected int getDivisor() {
519 return getValue(DIVISOR_KEY, DEFAULT_DIVISOR);
520 }
521
522 protected int getValue(String key, int defaultValue) {
523 String value = System.getProperty(key);
524 if (StringUtils.isEmpty(value)) {
525 return defaultValue;
526 } else {
527 return new Integer(value);
528 }
529 }
530
531 public int getReadTimeout() {
532 return readTimeout;
533 }
534
535 public void setReadTimeout(int readTimeout) {
536 this.readTimeout = readTimeout;
537 }
538
539 }