1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.kuali.maven.wagon;
17
18 import java.io.File;
19 import java.io.FileInputStream;
20 import java.io.FileNotFoundException;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.net.URI;
25 import java.util.ArrayList;
26 import java.util.Date;
27 import java.util.List;
28
29 import org.apache.commons.io.IOUtils;
30 import org.apache.commons.lang.StringUtils;
31 import org.apache.maven.wagon.ResourceDoesNotExistException;
32 import org.apache.maven.wagon.TransferFailedException;
33 import org.apache.maven.wagon.authentication.AuthenticationException;
34 import org.apache.maven.wagon.authentication.AuthenticationInfo;
35 import org.apache.maven.wagon.proxy.ProxyInfo;
36 import org.apache.maven.wagon.repository.Repository;
37 import org.kuali.common.threads.ExecutionStatistics;
38 import org.kuali.common.threads.ThreadHandlerContext;
39 import org.kuali.common.threads.ThreadInvoker;
40 import org.kuali.common.threads.listener.PercentCompleteListener;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 import com.amazonaws.AmazonClientException;
45 import com.amazonaws.AmazonServiceException;
46 import com.amazonaws.auth.AWSCredentials;
47 import com.amazonaws.auth.BasicAWSCredentials;
48 import com.amazonaws.services.s3.AmazonS3Client;
49 import com.amazonaws.services.s3.internal.Mimetypes;
50 import com.amazonaws.services.s3.model.Bucket;
51 import com.amazonaws.services.s3.model.CannedAccessControlList;
52 import com.amazonaws.services.s3.model.ObjectListing;
53 import com.amazonaws.services.s3.model.ObjectMetadata;
54 import com.amazonaws.services.s3.model.PutObjectRequest;
55 import com.amazonaws.services.s3.model.S3Object;
56 import com.amazonaws.services.s3.model.S3ObjectSummary;
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90 public class S3Wagon extends AbstractWagon implements RequestFactory {
91 public static final String MIN_THREADS_KEY = "maven.wagon.threads.min";
92 public static final String MAX_THREADS_KEY = "maven.wagon.threads.max";
93 public static final String DIVISOR_KEY = "maven.wagon.threads.divisor";
94 public static final int DEFAULT_MIN_THREAD_COUNT = 10;
95 public static final int DEFAULT_MAX_THREAD_COUNT = 50;
96 public static final int DEFAULT_DIVISOR = 50;
97 public static final int DEFAULT_READ_TIMEOUT = 60 * 1000;
98
99 ThreadInvoker invoker = new ThreadInvoker();
100 SimpleFormatter formatter = new SimpleFormatter();
101 int minThreads = getMinThreads();
102 int maxThreads = getMaxThreads();
103 int divisor = getDivisor();
104 int readTimeout = DEFAULT_READ_TIMEOUT;
105
106 final Logger log = LoggerFactory.getLogger(S3Wagon.class);
107
108 private AmazonS3Client client;
109
110 private Bucket bucket;
111
112 private String basedir;
113
114 private final Mimetypes mimeTypes = Mimetypes.getInstance();
115
116 public S3Wagon() {
117 super(true);
118 S3Listener listener = new S3Listener();
119 super.addSessionListener(listener);
120 super.addTransferListener(listener);
121 }
122
123 protected Bucket getOrCreateBucket(final AmazonS3Client client, final String bucketName) {
124 List<Bucket> buckets = client.listBuckets();
125 for (Bucket bucket : buckets) {
126 if (bucket.getName().equals(bucketName)) {
127 return bucket;
128 }
129 }
130 return client.createBucket(bucketName);
131 }
132
133 @Override
134 protected void connectToRepository(final Repository source, final AuthenticationInfo authenticationInfo,
135 final ProxyInfo proxyInfo) throws AuthenticationException {
136
137 AWSCredentials credentials = getCredentials(authenticationInfo);
138 client = new AmazonS3Client(credentials);
139 bucket = getOrCreateBucket(client, source.getHost());
140 basedir = getBaseDir(source);
141 }
142
143 @Override
144 protected boolean doesRemoteResourceExist(final String resourceName) {
145 try {
146 client.getObjectMetadata(bucket.getName(), basedir + resourceName);
147 } catch (AmazonClientException e1) {
148 return false;
149 }
150 return true;
151 }
152
153 @Override
154 protected void disconnectFromRepository() {
155
156 }
157
158
159
160
161 @Override
162 protected void getResource(final String resourceName, final File destination, final TransferProgress progress)
163 throws ResourceDoesNotExistException, IOException {
164
165 S3Object object = null;
166 try {
167 String key = basedir + resourceName;
168 object = client.getObject(bucket.getName(), key);
169 } catch (Exception e) {
170 throw new ResourceDoesNotExistException("Resource " + resourceName + " does not exist in the repository", e);
171 }
172
173
174 InputStream in = null;
175 OutputStream out = null;
176 try {
177 in = object.getObjectContent();
178 out = new TransferProgressFileOutputStream(destination, progress);
179 byte[] buffer = new byte[1024];
180 int length;
181 while ((length = in.read(buffer)) != -1) {
182 out.write(buffer, 0, length);
183 }
184 } finally {
185 IOUtils.closeQuietly(in);
186 IOUtils.closeQuietly(out);
187 }
188 }
189
190
191
192
193 @Override
194 protected boolean isRemoteResourceNewer(final String resourceName, final long timestamp) {
195 ObjectMetadata metadata = client.getObjectMetadata(bucket.getName(), basedir + resourceName);
196 return metadata.getLastModified().compareTo(new Date(timestamp)) < 0;
197 }
198
199
200
201
202 @Override
203 protected List<String> listDirectory(final String directory) throws Exception {
204 ObjectListing objectListing = client.listObjects(bucket.getName(), basedir + directory);
205 List<String> fileNames = new ArrayList<String>();
206 for (S3ObjectSummary summary : objectListing.getObjectSummaries()) {
207 fileNames.add(summary.getKey());
208 }
209 return fileNames;
210 }
211
212
213
214
215
216
217
218
219 protected String getNormalizedKey(final File source, final String destination) {
220
221 String key = basedir + destination;
222 try {
223 String prefix = "http://s3.amazonaws.com/" + bucket.getName() + "/";
224 String urlString = prefix + key;
225 URI rawURI = new URI(urlString);
226 URI normalizedURI = rawURI.normalize();
227 String normalized = normalizedURI.toString();
228 int pos = normalized.indexOf(prefix) + prefix.length();
229 String normalizedKey = normalized.substring(pos);
230 return normalizedKey;
231 } catch (Exception e) {
232 throw new RuntimeException(e);
233 }
234 }
235
236 protected ObjectMetadata getObjectMetadata(final File source, final String destination) {
237
238 String contentType = mimeTypes.getMimetype(destination);
239 long contentLength = source.length();
240
241 ObjectMetadata omd = new ObjectMetadata();
242 omd.setContentLength(contentLength);
243 omd.setContentType(contentType);
244 return omd;
245 }
246
247
248
249
250 public PutObjectRequest getPutObjectRequest(PutFileContext context) {
251 File source = context.getSource();
252 String destination = context.getDestination();
253 TransferProgress progress = context.getProgress();
254 return getPutObjectRequest(source, destination, progress);
255 }
256
257 protected InputStream getInputStream(File source, TransferProgress progress) throws FileNotFoundException {
258 if (progress == null) {
259 return new FileInputStream(source);
260 } else {
261 return new TransferProgressFileInputStream(source, progress);
262 }
263 }
264
265
266
267
268 protected PutObjectRequest getPutObjectRequest(File source, String destination, TransferProgress progress) {
269 try {
270 String key = getNormalizedKey(source, destination);
271 String bucketName = bucket.getName();
272 InputStream input = getInputStream(source, progress);
273 ObjectMetadata metadata = getObjectMetadata(source, destination);
274 PutObjectRequest request = new PutObjectRequest(bucketName, key, input, metadata);
275 request.setCannedAcl(CannedAccessControlList.PublicRead);
276 return request;
277 } catch (FileNotFoundException e) {
278 throw new AmazonServiceException("File not found", e);
279 }
280 }
281
282
283
284
285
286
287
288 public final void putDirectory(File sourceDir, String destinationDir) throws TransferFailedException {
289
290
291 List<PutFileContext> contexts = getPutFileContexts(sourceDir, destinationDir);
292 for (PutFileContext context : contexts) {
293
294 context.setProgress(null);
295 }
296
297
298 long bytes = sum(contexts);
299
300
301 log.info(getUploadStartMsg(contexts.size(), bytes));
302
303
304 ThreadHandlerContext<PutFileContext> thc = new ThreadHandlerContext<PutFileContext>();
305 thc.setList(contexts);
306 thc.setHandler(new FileHandler());
307 thc.setMax(maxThreads);
308 thc.setMin(minThreads);
309 thc.setDivisor(divisor);
310 thc.setListener(new PercentCompleteListener<PutFileContext>());
311
312
313 ExecutionStatistics stats = invoker.invokeThreads(thc);
314
315
316 long millis = stats.getExecutionTime();
317 long count = stats.getIterationCount();
318 log.info(getUploadCompleteMsg(millis, bytes, count));
319 }
320
321 protected String getUploadCompleteMsg(long millis, long bytes, long count) {
322 String rate = formatter.getRate(millis, bytes);
323 String time = formatter.getTime(millis);
324 StringBuilder sb = new StringBuilder();
325 sb.append("Files: " + count);
326 sb.append(" Time: " + time);
327 sb.append(" Rate: " + rate);
328 return sb.toString();
329 }
330
331 protected String getUploadStartMsg(int fileCount, long bytes) {
332 StringBuilder sb = new StringBuilder();
333 sb.append("Files: " + fileCount);
334 sb.append(" Bytes: " + formatter.getSize(bytes));
335 return sb.toString();
336 }
337
338 protected int getRequestsPerThread(int threads, int requests) {
339 int requestsPerThread = requests / threads;
340 while (requestsPerThread * threads < requests) {
341 requestsPerThread++;
342 }
343 return requestsPerThread;
344 }
345
346 protected long sum(List<PutFileContext> contexts) {
347 long sum = 0;
348 for (PutFileContext context : contexts) {
349 File file = context.getSource();
350 long length = file.length();
351 sum += length;
352 }
353 return sum;
354 }
355
356
357
358
359 @Override
360 protected void putResource(final File source, final String destination, final TransferProgress progress)
361 throws IOException {
362
363
364 PutObjectRequest request = getPutObjectRequest(source, destination, progress);
365
366
367 client.putObject(request);
368 }
369
370 protected String getDestinationPath(final String destination) {
371 return destination.substring(0, destination.lastIndexOf('/'));
372 }
373
374
375
376
377
378
379 protected String getBaseDir(final Repository source) {
380 StringBuilder sb = new StringBuilder(source.getBasedir());
381 sb.deleteCharAt(0);
382 if (sb.length() == 0) {
383 return "";
384 }
385 if (sb.charAt(sb.length() - 1) != '/') {
386 sb.append('/');
387 }
388 return sb.toString();
389 }
390
391 protected String getAuthenticationErrorMessage() {
392 StringBuffer sb = new StringBuffer();
393 sb.append("The S3 wagon needs AWS Access Key set as the username and AWS Secret Key set as the password. eg:\n");
394 sb.append("<server>\n");
395 sb.append(" <id>my.server</id>\n");
396 sb.append(" <username>[AWS Access Key ID]</username>\n");
397 sb.append(" <password>[AWS Secret Access Key]</password>\n");
398 sb.append("</server>\n");
399 return sb.toString();
400 }
401
402
403
404
405 protected AWSCredentials getCredentials(final AuthenticationInfo authenticationInfo) throws AuthenticationException {
406 if (authenticationInfo == null) {
407 throw new AuthenticationException(getAuthenticationErrorMessage());
408 }
409 String accessKey = authenticationInfo.getUserName();
410 String secretKey = authenticationInfo.getPassword();
411 if (accessKey == null || secretKey == null) {
412 throw new AuthenticationException(getAuthenticationErrorMessage());
413 }
414 return new BasicAWSCredentials(accessKey, secretKey);
415 }
416
417
418
419
420
421
422 @Override
423 protected PutFileContext getPutFileContext(File source, String destination) {
424 PutFileContext context = super.getPutFileContext(source, destination);
425 context.setClient(client);
426 context.setFactory(this);
427 return context;
428 }
429
430 protected int getMinThreads() {
431 return getValue(MIN_THREADS_KEY, DEFAULT_MIN_THREAD_COUNT);
432 }
433
434 protected int getMaxThreads() {
435 return getValue(MAX_THREADS_KEY, DEFAULT_MAX_THREAD_COUNT);
436 }
437
438 protected int getDivisor() {
439 return getValue(DIVISOR_KEY, DEFAULT_DIVISOR);
440 }
441
442 protected int getValue(String key, int defaultValue) {
443 String value = System.getProperty(key);
444 if (StringUtils.isEmpty(value)) {
445 return defaultValue;
446 } else {
447 return new Integer(value);
448 }
449 }
450
451 public int getReadTimeout() {
452 return readTimeout;
453 }
454
455 public void setReadTimeout(int readTimeout) {
456 this.readTimeout = readTimeout;
457 }
458
459 }