1
2
3
4
5
6
7
8
9 package org.kuali.maven.wagon;
10
11 import java.io.File;
12 import java.io.FileInputStream;
13 import java.io.FileNotFoundException;
14 import java.io.IOException;
15 import java.io.InputStream;
16 import java.io.OutputStream;
17 import java.net.URI;
18 import java.util.ArrayList;
19 import java.util.Date;
20 import java.util.List;
21
22 import org.apache.commons.io.IOUtils;
23 import org.apache.commons.lang.StringUtils;
24 import org.apache.maven.wagon.ResourceDoesNotExistException;
25 import org.apache.maven.wagon.TransferFailedException;
26 import org.apache.maven.wagon.authentication.AuthenticationException;
27 import org.apache.maven.wagon.authentication.AuthenticationInfo;
28 import org.apache.maven.wagon.proxy.ProxyInfo;
29 import org.apache.maven.wagon.repository.Repository;
30 import org.kuali.common.threads.ExecutionStatistics;
31 import org.kuali.common.threads.ThreadHandlerContext;
32 import org.kuali.common.threads.ThreadInvoker;
33 import org.kuali.common.threads.listener.PercentCompleteListener;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import com.amazonaws.AmazonClientException;
38 import com.amazonaws.AmazonServiceException;
39 import com.amazonaws.auth.AWSCredentials;
40 import com.amazonaws.auth.BasicAWSCredentials;
41 import com.amazonaws.services.s3.AmazonS3Client;
42 import com.amazonaws.services.s3.internal.Mimetypes;
43 import com.amazonaws.services.s3.model.Bucket;
44 import com.amazonaws.services.s3.model.CannedAccessControlList;
45 import com.amazonaws.services.s3.model.ObjectListing;
46 import com.amazonaws.services.s3.model.ObjectMetadata;
47 import com.amazonaws.services.s3.model.PutObjectRequest;
48 import com.amazonaws.services.s3.model.S3Object;
49 import com.amazonaws.services.s3.model.S3ObjectSummary;
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 public class S3Wagon extends AbstractWagon implements RequestFactory {
84 public static final String MIN_THREADS_KEY = "maven.wagon.threads.min";
85 public static final String MAX_THREADS_KEY = "maven.wagon.threads.max";
86 public static final String DIVISOR_KEY = "maven.wagon.threads.divisor";
87 public static final int DEFAULT_MIN_THREAD_COUNT = 10;
88 public static final int DEFAULT_MAX_THREAD_COUNT = 50;
89 public static final int DEFAULT_DIVISOR = 50;
90
91 ThreadInvoker invoker = new ThreadInvoker();
92 SimpleFormatter formatter = new SimpleFormatter();
93 int minThreads = getMinThreads();
94 int maxThreads = getMaxThreads();
95 int divisor = getDivisor();
96
97 final Logger log = LoggerFactory.getLogger(S3Wagon.class);
98
99 private AmazonS3Client client;
100
101 private Bucket bucket;
102
103 private String basedir;
104
105 private final Mimetypes mimeTypes = Mimetypes.getInstance();
106
107 public S3Wagon() {
108 super(true);
109 S3Listener listener = new S3Listener();
110 super.addSessionListener(listener);
111 super.addTransferListener(listener);
112 }
113
114 protected Bucket getOrCreateBucket(final AmazonS3Client client, final String bucketName) {
115 List<Bucket> buckets = client.listBuckets();
116 for (Bucket bucket : buckets) {
117 if (bucket.getName().equals(bucketName)) {
118 return bucket;
119 }
120 }
121 return client.createBucket(bucketName);
122 }
123
124 @Override
125 protected void connectToRepository(final Repository source, final AuthenticationInfo authenticationInfo,
126 final ProxyInfo proxyInfo) throws AuthenticationException {
127
128 AWSCredentials credentials = getCredentials(authenticationInfo);
129 client = new AmazonS3Client(credentials);
130 bucket = getOrCreateBucket(client, source.getHost());
131 basedir = getBaseDir(source);
132 }
133
134 @Override
135 protected boolean doesRemoteResourceExist(final String resourceName) {
136 try {
137 client.getObjectMetadata(bucket.getName(), basedir + resourceName);
138 } catch (AmazonClientException e1) {
139 return false;
140 }
141 return true;
142 }
143
144 @Override
145 protected void disconnectFromRepository() {
146
147 }
148
149
150
151
152 @Override
153 protected void getResource(final String resourceName, final File destination, final TransferProgress progress)
154 throws ResourceDoesNotExistException, IOException {
155
156 S3Object object = null;
157 try {
158 String key = basedir + resourceName;
159 object = client.getObject(bucket.getName(), key);
160 } catch (Exception e) {
161 throw new ResourceDoesNotExistException("Resource " + resourceName + " does not exist in the repository", e);
162 }
163
164
165 InputStream in = null;
166 OutputStream out = null;
167 try {
168 in = object.getObjectContent();
169 out = new TransferProgressFileOutputStream(destination, progress);
170 byte[] buffer = new byte[1024];
171 int length;
172 while ((length = in.read(buffer)) != -1) {
173 out.write(buffer, 0, length);
174 }
175 } finally {
176 IOUtils.closeQuietly(in);
177 IOUtils.closeQuietly(out);
178 }
179 }
180
181
182
183
184 @Override
185 protected boolean isRemoteResourceNewer(final String resourceName, final long timestamp) {
186 ObjectMetadata metadata = client.getObjectMetadata(bucket.getName(), basedir + resourceName);
187 return metadata.getLastModified().compareTo(new Date(timestamp)) < 0;
188 }
189
190
191
192
193 @Override
194 protected List<String> listDirectory(final String directory) throws Exception {
195 ObjectListing objectListing = client.listObjects(bucket.getName(), basedir + directory);
196 List<String> fileNames = new ArrayList<String>();
197 for (S3ObjectSummary summary : objectListing.getObjectSummaries()) {
198 fileNames.add(summary.getKey());
199 }
200 return fileNames;
201 }
202
203
204
205
206
207
208
209
210 protected String getNormalizedKey(final File source, final String destination) {
211
212 String key = basedir + destination;
213 try {
214 String prefix = "http://s3.amazonaws.com/" + bucket.getName() + "/";
215 String urlString = prefix + key;
216 URI rawURI = new URI(urlString);
217 URI normalizedURI = rawURI.normalize();
218 String normalized = normalizedURI.toString();
219 int pos = normalized.indexOf(prefix) + prefix.length();
220 String normalizedKey = normalized.substring(pos);
221 return normalizedKey;
222 } catch (Exception e) {
223 throw new RuntimeException(e);
224 }
225 }
226
227 protected ObjectMetadata getObjectMetadata(final File source, final String destination) {
228
229 String contentType = mimeTypes.getMimetype(destination);
230 long contentLength = source.length();
231
232 ObjectMetadata omd = new ObjectMetadata();
233 omd.setContentLength(contentLength);
234 omd.setContentType(contentType);
235 return omd;
236 }
237
238
239
240
241 public PutObjectRequest getPutObjectRequest(PutFileContext context) {
242 File source = context.getSource();
243 String destination = context.getDestination();
244 TransferProgress progress = context.getProgress();
245 return getPutObjectRequest(source, destination, progress);
246 }
247
248 protected InputStream getInputStream(File source, TransferProgress progress) throws FileNotFoundException {
249 if (progress == null) {
250 return new FileInputStream(source);
251 } else {
252 return new TransferProgressFileInputStream(source, progress);
253 }
254 }
255
256
257
258
259 protected PutObjectRequest getPutObjectRequest(File source, String destination, TransferProgress progress) {
260 try {
261 String key = getNormalizedKey(source, destination);
262 String bucketName = bucket.getName();
263 InputStream input = getInputStream(source, progress);
264 ObjectMetadata metadata = getObjectMetadata(source, destination);
265 PutObjectRequest request = new PutObjectRequest(bucketName, key, input, metadata);
266 request.setCannedAcl(CannedAccessControlList.PublicRead);
267 return request;
268 } catch (FileNotFoundException e) {
269 throw new AmazonServiceException("File not found", e);
270 }
271 }
272
273
274
275
276
277
278
279 public final void putDirectory(File sourceDir, String destinationDir) throws TransferFailedException {
280
281
282 List<PutFileContext> contexts = getPutFileContexts(sourceDir, destinationDir);
283 for (PutFileContext context : contexts) {
284
285 context.setProgress(null);
286 }
287
288
289 long bytes = sum(contexts);
290
291
292 log.info(getUploadStartMsg(contexts.size(), bytes));
293
294
295 ThreadHandlerContext<PutFileContext> thc = new ThreadHandlerContext<PutFileContext>();
296 thc.setList(contexts);
297 thc.setHandler(new FileHandler());
298 thc.setMax(maxThreads);
299 thc.setMin(minThreads);
300 thc.setDivisor(divisor);
301 thc.setListener(new PercentCompleteListener<PutFileContext>());
302
303
304 ExecutionStatistics stats = invoker.invokeThreads(thc);
305
306
307 long millis = stats.getExecutionTime();
308 long count = stats.getIterationCount();
309 log.info(getUploadCompleteMsg(millis, bytes, count));
310 }
311
312 protected String getUploadCompleteMsg(long millis, long bytes, long count) {
313 String rate = formatter.getRate(millis, bytes);
314 String time = formatter.getTime(millis);
315 StringBuilder sb = new StringBuilder();
316 sb.append("Files: " + count);
317 sb.append(" Time: " + time);
318 sb.append(" Rate: " + rate);
319 return sb.toString();
320 }
321
322 protected String getUploadStartMsg(int fileCount, long bytes) {
323 StringBuilder sb = new StringBuilder();
324 sb.append("Files: " + fileCount);
325 sb.append(" Bytes: " + formatter.getSize(bytes));
326 return sb.toString();
327 }
328
329 protected int getRequestsPerThread(int threads, int requests) {
330 int requestsPerThread = requests / threads;
331 while (requestsPerThread * threads < requests) {
332 requestsPerThread++;
333 }
334 return requestsPerThread;
335 }
336
337 protected long sum(List<PutFileContext> contexts) {
338 long sum = 0;
339 for (PutFileContext context : contexts) {
340 File file = context.getSource();
341 long length = file.length();
342 sum += length;
343 }
344 return sum;
345 }
346
347
348
349
350 @Override
351 protected void putResource(final File source, final String destination, final TransferProgress progress)
352 throws IOException {
353
354
355 PutObjectRequest request = getPutObjectRequest(source, destination, progress);
356
357
358 client.putObject(request);
359 }
360
361 protected String getDestinationPath(final String destination) {
362 return destination.substring(0, destination.lastIndexOf('/'));
363 }
364
365
366
367
368
369
370 protected String getBaseDir(final Repository source) {
371 StringBuilder sb = new StringBuilder(source.getBasedir());
372 sb.deleteCharAt(0);
373 if (sb.length() == 0) {
374 return "";
375 }
376 if (sb.charAt(sb.length() - 1) != '/') {
377 sb.append('/');
378 }
379 return sb.toString();
380 }
381
382 protected String getAuthenticationErrorMessage() {
383 StringBuffer sb = new StringBuffer();
384 sb.append("The S3 wagon needs AWS Access Key set as the username and AWS Secret Key set as the password. eg:\n");
385 sb.append("<server>\n");
386 sb.append(" <id>my.server</id>\n");
387 sb.append(" <username>[AWS Access Key ID]</username>\n");
388 sb.append(" <password>[AWS Secret Access Key]</password>\n");
389 sb.append("</server>\n");
390 return sb.toString();
391 }
392
393
394
395
396 protected AWSCredentials getCredentials(final AuthenticationInfo authenticationInfo) throws AuthenticationException {
397 if (authenticationInfo == null) {
398 throw new AuthenticationException(getAuthenticationErrorMessage());
399 }
400 String accessKey = authenticationInfo.getUserName();
401 String secretKey = authenticationInfo.getPassword();
402 if (accessKey == null || secretKey == null) {
403 throw new AuthenticationException(getAuthenticationErrorMessage());
404 }
405 return new BasicAWSCredentials(accessKey, secretKey);
406 }
407
408 @Override
409 protected PutFileContext getPutFileContext(File source, String destination) {
410 PutFileContext context = super.getPutFileContext(source, destination);
411 context.setClient(client);
412 context.setFactory(this);
413 return context;
414 }
415
416 protected int getMinThreads() {
417 return getValue(MIN_THREADS_KEY, DEFAULT_MIN_THREAD_COUNT);
418 }
419
420 protected int getMaxThreads() {
421 return getValue(MAX_THREADS_KEY, DEFAULT_MAX_THREAD_COUNT);
422 }
423
424 protected int getDivisor() {
425 return getValue(DIVISOR_KEY, DEFAULT_DIVISOR);
426 }
427
428 protected int getValue(String key, int defaultValue) {
429 String value = System.getProperty(key);
430 if (StringUtils.isEmpty(value)) {
431 return defaultValue;
432 } else {
433 return new Integer(value);
434 }
435 }
436
437 }