1
2
3
4
5
6
7
8
9 package org.kuali.maven.wagon;
10
11 import java.io.File;
12 import java.io.FileInputStream;
13 import java.io.FileNotFoundException;
14 import java.io.IOException;
15 import java.io.InputStream;
16 import java.io.OutputStream;
17 import java.net.URI;
18 import java.util.ArrayList;
19 import java.util.Date;
20 import java.util.List;
21
22 import org.apache.commons.io.IOUtils;
23 import org.apache.commons.lang.StringUtils;
24 import org.apache.maven.wagon.ResourceDoesNotExistException;
25 import org.apache.maven.wagon.TransferFailedException;
26 import org.apache.maven.wagon.authentication.AuthenticationException;
27 import org.apache.maven.wagon.authentication.AuthenticationInfo;
28 import org.apache.maven.wagon.proxy.ProxyInfo;
29 import org.apache.maven.wagon.repository.Repository;
30 import org.kuali.common.threads.ExecutionStatistics;
31 import org.kuali.common.threads.ThreadHandlerContext;
32 import org.kuali.common.threads.ThreadInvoker;
33 import org.kuali.common.threads.listener.PercentCompleteListener;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import com.amazonaws.AmazonClientException;
38 import com.amazonaws.AmazonServiceException;
39 import com.amazonaws.auth.AWSCredentials;
40 import com.amazonaws.auth.BasicAWSCredentials;
41 import com.amazonaws.services.s3.AmazonS3Client;
42 import com.amazonaws.services.s3.internal.Mimetypes;
43 import com.amazonaws.services.s3.model.Bucket;
44 import com.amazonaws.services.s3.model.CannedAccessControlList;
45 import com.amazonaws.services.s3.model.ObjectListing;
46 import com.amazonaws.services.s3.model.ObjectMetadata;
47 import com.amazonaws.services.s3.model.PutObjectRequest;
48 import com.amazonaws.services.s3.model.S3Object;
49 import com.amazonaws.services.s3.model.S3ObjectSummary;
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 public class S3Wagon extends AbstractWagon implements RequestFactory {
84 public static final String MIN_THREADS_KEY = "maven.wagon.threads.min";
85 public static final String MAX_THREADS_KEY = "maven.wagon.threads.max";
86 public static final String DIVISOR_KEY = "maven.wagon.threads.divisor";
87 public static final int DEFAULT_MIN_THREAD_COUNT = 10;
88 public static final int DEFAULT_MAX_THREAD_COUNT = 50;
89 public static final int DEFAULT_DIVISOR = 50;
90
91 ThreadInvoker invoker = new ThreadInvoker();
92 SimpleFormatter formatter = new SimpleFormatter();
93 int minThreads = getMinThreads();
94 int maxThreads = getMaxThreads();
95 int divisor = getDivisor();
96
97 final Logger log = LoggerFactory.getLogger(S3Wagon.class);
98
99 private AmazonS3Client client;
100
101 private Bucket bucket;
102
103 private String basedir;
104
105 private final Mimetypes mimeTypes = Mimetypes.getInstance();
106
107 public S3Wagon() {
108 super(true);
109 S3Listener listener = new S3Listener();
110 super.addSessionListener(listener);
111 super.addTransferListener(listener);
112 }
113
114 protected Bucket getOrCreateBucket(final AmazonS3Client client, final String bucketName) {
115 List<Bucket> buckets = client.listBuckets();
116 for (Bucket bucket : buckets) {
117 if (bucket.getName().equals(bucketName)) {
118 return bucket;
119 }
120 }
121 return client.createBucket(bucketName);
122 }
123
124 @Override
125 protected void connectToRepository(final Repository source, final AuthenticationInfo authenticationInfo,
126 final ProxyInfo proxyInfo) throws AuthenticationException {
127
128 AWSCredentials credentials = getCredentials(authenticationInfo);
129 client = new AmazonS3Client(credentials);
130 bucket = getOrCreateBucket(client, source.getHost());
131 basedir = getBaseDir(source);
132 }
133
134 @Override
135 protected boolean doesRemoteResourceExist(final String resourceName) {
136 try {
137 client.getObjectMetadata(bucket.getName(), basedir + resourceName);
138 } catch (AmazonClientException e1) {
139 return false;
140 }
141 return true;
142 }
143
144 @Override
145 protected void disconnectFromRepository() {
146
147 }
148
149
150
151
152 @Override
153 protected void getResource(final String resourceName, final File destination, final TransferProgress progress)
154 throws ResourceDoesNotExistException, IOException {
155
156 S3Object object = null;
157 try {
158 String key = basedir + resourceName;
159 object = client.getObject(bucket.getName(), key);
160 } catch (Exception e) {
161 throw new ResourceDoesNotExistException("Resource " + resourceName + " does not exist in the repository", e);
162 }
163
164
165 InputStream in = null;
166 OutputStream out = null;
167 try {
168 in = object.getObjectContent();
169 out = new TransferProgressFileOutputStream(destination, progress);
170 byte[] buffer = new byte[1024];
171 int length;
172 while ((length = in.read(buffer)) != -1) {
173 out.write(buffer, 0, length);
174 }
175 } finally {
176 IOUtils.closeQuietly(in);
177 IOUtils.closeQuietly(out);
178 }
179 }
180
181
182
183
184 @Override
185 protected boolean isRemoteResourceNewer(final String resourceName, final long timestamp) {
186 ObjectMetadata metadata = client.getObjectMetadata(bucket.getName(), basedir + resourceName);
187 return metadata.getLastModified().compareTo(new Date(timestamp)) < 0;
188 }
189
190
191
192
193 @Override
194 protected List<String> listDirectory(final String directory) throws Exception {
195 ObjectListing objectListing = client.listObjects(bucket.getName(), basedir + directory);
196 List<String> fileNames = new ArrayList<String>();
197 for (S3ObjectSummary summary : objectListing.getObjectSummaries()) {
198 fileNames.add(summary.getKey());
199 }
200 return fileNames;
201 }
202
203
204
205
206
207
208
209
210 protected String getNormalizedKey(final File source, final String destination) {
211
212 String key = basedir + destination;
213 try {
214 String prefix = "http://s3.amazonaws.com/" + bucket.getName() + "/";
215 String urlString = prefix + key;
216 URI rawURI = new URI(urlString);
217 URI normalizedURI = rawURI.normalize();
218 String normalized = normalizedURI.toString();
219 int pos = normalized.indexOf(prefix) + prefix.length();
220 String normalizedKey = normalized.substring(pos);
221 return normalizedKey;
222 } catch (Exception e) {
223 throw new RuntimeException(e);
224 }
225 }
226
227 protected ObjectMetadata getObjectMetadata(final File source, final String destination) {
228
229 String contentType = mimeTypes.getMimetype(destination);
230 long contentLength = source.length();
231
232 ObjectMetadata omd = new ObjectMetadata();
233 omd.setContentLength(contentLength);
234 omd.setContentType(contentType);
235 return omd;
236 }
237
238
239
240
241 public PutObjectRequest getPutObjectRequest(PutFileContext context) {
242 File source = context.getSource();
243 String destination = context.getDestination();
244 TransferProgress progress = context.getProgress();
245 return getPutObjectRequest(source, destination, progress);
246 }
247
248 protected InputStream getInputStream(File source, TransferProgress progress) throws FileNotFoundException {
249 if (progress == null) {
250 return new FileInputStream(source);
251 } else {
252 return new TransferProgressFileInputStream(source, progress);
253 }
254 }
255
256
257
258
259 protected PutObjectRequest getPutObjectRequest(File source, String destination, TransferProgress progress) {
260 try {
261 String key = getNormalizedKey(source, destination);
262 String bucketName = bucket.getName();
263 InputStream input = getInputStream(source, progress);
264 ObjectMetadata metadata = getObjectMetadata(source, destination);
265 PutObjectRequest request = new PutObjectRequest(bucketName, key, input, metadata);
266 request.setCannedAcl(CannedAccessControlList.PublicRead);
267 return request;
268 } catch (FileNotFoundException e) {
269 throw new AmazonServiceException("File not found", e);
270 }
271 }
272
273
274
275
276
277
278
279 public final void putDirectory(File sourceDir, String destinationDir) throws TransferFailedException {
280
281
282 List<PutFileContext> contexts = getPutFileContexts(sourceDir, destinationDir);
283 for (PutFileContext context : contexts) {
284
285 context.setProgress(null);
286 }
287
288
289 long bytes = sum(contexts);
290
291
292 log.info("Uploading - " + sourceDir.getAbsolutePath());
293 log.info(getUploadStartMsg(contexts.size(), bytes));
294
295
296 ThreadHandlerContext<PutFileContext> thc = new ThreadHandlerContext<PutFileContext>();
297 thc.setList(contexts);
298 thc.setHandler(new FileHandler());
299 thc.setMax(maxThreads);
300 thc.setMin(minThreads);
301 thc.setDivisor(divisor);
302 thc.setListener(new PercentCompleteListener<PutFileContext>());
303
304
305 ExecutionStatistics stats = invoker.invokeThreads(thc);
306
307
308 long millis = stats.getExecutionTime();
309 long count = stats.getIterationCount();
310 log.info(getUploadCompleteMsg(millis, bytes, count));
311 }
312
313 protected String getUploadCompleteMsg(long millis, long bytes, long count) {
314 String rate = formatter.getRate(millis, bytes);
315 String time = formatter.getTime(millis);
316 StringBuilder sb = new StringBuilder();
317 sb.append("Files: " + count);
318 sb.append(" Time: " + time);
319 sb.append(" Rate: " + rate);
320 return sb.toString();
321 }
322
323 protected String getUploadStartMsg(int fileCount, long bytes) {
324 StringBuilder sb = new StringBuilder();
325 sb.append("Files: " + fileCount);
326 sb.append(" Bytes: " + formatter.getSize(bytes));
327 return sb.toString();
328 }
329
330 protected int getRequestsPerThread(int threads, int requests) {
331 int requestsPerThread = requests / threads;
332 while (requestsPerThread * threads < requests) {
333 requestsPerThread++;
334 }
335 return requestsPerThread;
336 }
337
338 protected long sum(List<PutFileContext> contexts) {
339 long sum = 0;
340 for (PutFileContext context : contexts) {
341 File file = context.getSource();
342 long length = file.length();
343 sum += length;
344 }
345 return sum;
346 }
347
348
349
350
351 @Override
352 protected void putResource(final File source, final String destination, final TransferProgress progress)
353 throws IOException {
354
355
356 PutObjectRequest request = getPutObjectRequest(source, destination, progress);
357
358
359 client.putObject(request);
360 }
361
362 protected String getDestinationPath(final String destination) {
363 return destination.substring(0, destination.lastIndexOf('/'));
364 }
365
366
367
368
369
370
371 protected String getBaseDir(final Repository source) {
372 StringBuilder sb = new StringBuilder(source.getBasedir());
373 sb.deleteCharAt(0);
374 if (sb.length() == 0) {
375 return "";
376 }
377 if (sb.charAt(sb.length() - 1) != '/') {
378 sb.append('/');
379 }
380 return sb.toString();
381 }
382
383 protected String getAuthenticationErrorMessage() {
384 StringBuffer sb = new StringBuffer();
385 sb.append("The S3 wagon needs AWS Access Key set as the username and AWS Secret Key set as the password. eg:\n");
386 sb.append("<server>\n");
387 sb.append(" <id>my.server</id>\n");
388 sb.append(" <username>[AWS Access Key ID]</username>\n");
389 sb.append(" <password>[AWS Secret Access Key]</password>\n");
390 sb.append("</server>\n");
391 return sb.toString();
392 }
393
394
395
396
397 protected AWSCredentials getCredentials(final AuthenticationInfo authenticationInfo) throws AuthenticationException {
398 if (authenticationInfo == null) {
399 throw new AuthenticationException(getAuthenticationErrorMessage());
400 }
401 String accessKey = authenticationInfo.getUserName();
402 String secretKey = authenticationInfo.getPassword();
403 if (accessKey == null || secretKey == null) {
404 throw new AuthenticationException(getAuthenticationErrorMessage());
405 }
406 return new BasicAWSCredentials(accessKey, secretKey);
407 }
408
409 @Override
410 protected PutFileContext getPutFileContext(File source, String destination) {
411 PutFileContext context = super.getPutFileContext(source, destination);
412 context.setClient(client);
413 context.setFactory(this);
414 return context;
415 }
416
417 protected int getMinThreads() {
418 return getValue(MIN_THREADS_KEY, DEFAULT_MIN_THREAD_COUNT);
419 }
420
421 protected int getMaxThreads() {
422 return getValue(MAX_THREADS_KEY, DEFAULT_MAX_THREAD_COUNT);
423 }
424
425 protected int getDivisor() {
426 return getValue(DIVISOR_KEY, DEFAULT_DIVISOR);
427 }
428
429 protected int getValue(String key, int defaultValue) {
430 String value = System.getProperty(key);
431 if (StringUtils.isEmpty(value)) {
432 return defaultValue;
433 } else {
434 return new Integer(value);
435 }
436 }
437
438 }