diff --git a/aws-blog-serverless-cf-analysis/Config b/aws-blog-serverless-cf-analysis/Config new file mode 100644 index 00000000..31c76714 --- /dev/null +++ b/aws-blog-serverless-cf-analysis/Config @@ -0,0 +1,37 @@ +# -*-perl-*- + +package.Aws-lambda-athena = { + interfaces = (1.0); + + deploy = { + generic = true; + }; + + build-environment = { + chroot = basic; + network-access = blocked; + }; + + # Use NoOpBuild. See https://w.amazon.com/index.php/BrazilBuildSystem/NoOpBuild + build-system = no-op; + build-tools = { + 1.0 = { + NoOpBuild = 1.0; + }; + }; + + # Use runtime-dependencies for when you want to bring in additional + # packages when deploying. + # Use dependencies instead if you intend for these dependencies to + # be exported to other packages that build against you. + dependencies = { + 1.0 = { + }; + }; + + runtime-dependencies = { + 1.0 = { + }; + }; + +}; diff --git a/aws-blog-serverless-cf-analysis/pom.xml b/aws-blog-serverless-cf-analysis/pom.xml new file mode 100644 index 00000000..36eff1e3 --- /dev/null +++ b/aws-blog-serverless-cf-analysis/pom.xml @@ -0,0 +1,94 @@ + + + 4.0.0 + + com.amazonaws.services.lambda + aws-lambda-athena + 1.0.0 + + + UTF-8 + + + + + + com.amazonaws + aws-lambda-java-core + 1.1.0 + + + + com.amazonaws + aws-lambda-java-events + 1.3.0 + + + + + + com.amazonaws + athena.jdbc41 + 1.0.0 + + + + junit + junit + 4.12 + test + + + + org.mockito + mockito-core + 2.7.5 + test + + + + + + + + maven-compiler-plugin + 3.5.1 + + 1.8 + 1.8 + + + + maven-shade-plugin + 2.4.3 + + + package + + shade + + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + \ No newline at end of file diff --git a/aws-blog-serverless-cf-analysis/readme.md b/aws-blog-serverless-cf-analysis/readme.md new file mode 100644 index 00000000..4bdcda1c --- /dev/null +++ b/aws-blog-serverless-cf-analysis/readme.md @@ -0,0 +1,83 @@ +Description +----------- + +Creates partitions in Athena on behalf of files added to S3 that use a `/year/month/day/hour/` key prefix. + +Build +----- + +As a one-off operation, you'll need to install the Athena JDBC driver into a lib folder, and then add it to your local Maven repository so that it can be incorporated into the final jar: + +``` +mkdir lib +aws s3 cp s3://athena-downloads/drivers/AthenaJDBC41-1.0.1.jar lib/ +mvn install:install-file -Dfile=lib/AthenaJDBC41-1.0.1.jar -DgroupId=com.amazonaws -DartifactId=athena.jdbc41 -Dversion=1.0.0 -Dpackaging=jar -DgeneratePom=true +``` + +And then, to build: + +``` +mvn clean compile verify +``` + +Create an IAM Role +------------------ + +Before you create a Lambda function, you will need to create an IAM role that allows Lambda to execute queries in Athena. Create a role named `lambda_athena_exec_role` and attach the following managed policies to the role: AmazonS3FullAccess, AmazonAthenaFullAccess. + +Add this inline access policy: + +``` +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "logs:CreateLogGroup", + "logs:CreateLogStream", + "logs:PutLogEvents" + ], + "Resource": "arn:aws:logs:*:*:*" + } + ] +} +``` + +And attach the following trust relationship to enable Lambda to assume the role: + +``` +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "Service": "lambda.amazonaws.com" + }, + "Action": "sts:AssumeRole" + } + ] +} +``` + +Create a Lambda Function to Add Partitions to Athena +---------------------------------------------------- + +Create a Lambda function that can be associated with S3 new object event notifications. When creating the function, you'll need to set several environment variables: + + - `PARTITION_TYPE` Supply one of the following values: `Month`, `Day` or `Hour`. This environment variable is optional: if you omit it, the function will default to `Day`. + - `TABLE_NAME` Use the format ``.`. For example, `sampledb.vpc_flow_logs`. + - `S3_STAGING_DIR` An Amazon S3 location to which your query output will be written. (Although the Lambda function is only executing DDL statements, Athena still writes an output file to S3.) + - `ATHENA_REGION` The region in which Athena is located (e.g. `us-east-1`). + - `DDB_TABLE_NAME` The name of the DynamoDB table holding partition information. + +Specify the handler and an existing role: + + - *Handler:* `com.amazonaws.services.lambda.CreateAthenaPartitionsBasedOnS3Event::handleRequest` + - *Existing role:* `lambda_athena_exec_role` + +Set the timeout to one minute. + + + diff --git a/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/CreateAthenaPartitionsBasedOnS3Event.java b/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/CreateAthenaPartitionsBasedOnS3Event.java new file mode 100644 index 00000000..5a24081c --- /dev/null +++ b/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/CreateAthenaPartitionsBasedOnS3Event.java @@ -0,0 +1,70 @@ +package com.amazonaws.services.lambda; + + +import com.amazonaws.services.lambda.model.Partition; +import com.amazonaws.services.lambda.model.PartitionConfig; +import com.amazonaws.services.lambda.model.S3Object; +import com.amazonaws.services.lambda.model.TableService; +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.S3Event; +import com.amazonaws.services.s3.event.S3EventNotification; + +import java.util.Collection; +import java.util.HashSet; +import java.util.stream.Collectors; + +public class CreateAthenaPartitionsBasedOnS3Event implements RequestHandler { + + private final PartitionConfig partitionConfig; + + public CreateAthenaPartitionsBasedOnS3Event() { + this(PartitionConfig.fromEnv()); + } + + CreateAthenaPartitionsBasedOnS3Event(PartitionConfig partitionConfig) { + this.partitionConfig = partitionConfig; + } + + @Override + public Void handleRequest(S3Event s3Event, Context context) { + + Collection requiredPartitions = new HashSet<>(); + TableService tableService = new TableService(); + + for (S3EventNotification.S3EventNotificationRecord record : s3Event.getRecords()) { + + String bucket = record.getS3().getBucket().getName(); + String key = record.getS3().getObject().getKey(); + + System.out.printf("S3 event [Event: %s, Bucket: %s, Key: %s]%n", record.getEventName(), bucket, key); + + S3Object s3Object = new S3Object(bucket, key); + + if (s3Object.hasDateTimeKey()) { + requiredPartitions.add(partitionConfig.createPartitionFor(s3Object)); + } + } + + if (!requiredPartitions.isEmpty()) { + Collection missingPartitions = determineMissingPartitions( + partitionConfig.tableName(), + requiredPartitions, + tableService); + tableService.addPartitions(partitionConfig.tableName(), missingPartitions); + } + + return null; + } + + // We could use DynamoDB to store a list of existing partitions – quick then to check which of the required + // partitions already exist. + private Collection determineMissingPartitions(String tableName, Collection requiredPartitions, TableService tableService) { + + Collection existingPartitions = tableService.getExistingPartitions(tableName); + + return requiredPartitions.stream() + .filter(p -> !existingPartitions.contains(p.spec())) + .collect(Collectors.toList()); + } +} \ No newline at end of file diff --git a/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/CreateAthenaPartitionsBasedOnS3EventWithDDB.java b/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/CreateAthenaPartitionsBasedOnS3EventWithDDB.java new file mode 100644 index 00000000..edadf94c --- /dev/null +++ b/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/CreateAthenaPartitionsBasedOnS3EventWithDDB.java @@ -0,0 +1,98 @@ +package com.amazonaws.services.lambda; + +import com.amazonaws.athena.jdbc.shaded.com.amazonaws.auth.EnvironmentVariableCredentialsProvider; +import com.amazonaws.services.lambda.model.*; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.S3Event; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; +import com.amazonaws.services.dynamodbv2.document.DynamoDB; +import com.amazonaws.services.dynamodbv2.document.Item; +import com.amazonaws.services.dynamodbv2.document.Table; +import com.amazonaws.services.dynamodbv2.document.spec.PutItemSpec; +import com.amazonaws.services.dynamodbv2.document.utils.NameMap; +import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException; +import com.amazonaws.services.lambda.model.PartitionConfig; +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.S3Event; +import com.amazonaws.services.s3.event.S3EventNotification; +import com.amazonaws.auth.profile.ProfileCredentialsProvider; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.stream.Collectors; + +public class CreateAthenaPartitionsBasedOnS3EventWithDDB implements RequestHandler { + + private final PartitionConfig partitionConfig; + + public CreateAthenaPartitionsBasedOnS3EventWithDDB() { + this(PartitionConfig.fromEnv()); + } + + CreateAthenaPartitionsBasedOnS3EventWithDDB(PartitionConfig partitionConfig) { + this.partitionConfig = partitionConfig; + } + + @Override + public Void handleRequest(S3Event s3Event, Context context){ + + CollectionrequiredPartitions = new HashSet<>(); + TableService tableService = new TableService(); + DynamoDB dynamoDBClient=new DynamoDB(new AmazonDynamoDBClient(new EnvironmentVariableCredentialsProvider())); + + for(S3EventNotification.S3EventNotificationRecord record:s3Event.getRecords()){ + + String bucket=record.getS3().getBucket().getName(); + String key=record.getS3().getObject().getKey(); + + System.out.printf("S3event[Event:%s,Bucket:%s,Key:%s]%n",record.getEventName(),bucket,key); + + S3Object s3Object=new S3Object(bucket,key); + + if(s3Object.hasDateTimeKey()){ + Partition partition = partitionConfig.createPartitionFor(s3Object); + + //Check if the partition exists in DynamoDBtable, if not add the partition details to the table, skip otherwise + if (tryAddMissingPartition(partitionConfig.dynamoDBTableName(), dynamoDBClient, partition)) { + requiredPartitions.add(partition); + } + } + } + + if(!requiredPartitions.isEmpty()){ + tableService.addPartitions(partitionConfig.tableName(),requiredPartitions, true); + } + + return null; + } + + //ReturntrueifpartitionisnotinDynamoDBandaddthepartition,falseotherwise + private boolean tryAddMissingPartition(String dyanmoDBTaableName,DynamoDB dynamoDBClient, Partition partition){ + + Table ddbTable= dynamoDBClient.getTable(dyanmoDBTaableName); + + Item item=new Item() + .withPrimaryKey("PartitionSpec",partition.spec()) + .withString("PartitionPath",partition.path()) + .withString("PartitionName", partition.name()); + + PutItemSpec itemSpec=new PutItemSpec() + .withItem(item) + .withConditionExpression("attribute_not_exists(#ps)") + .withNameMap(new NameMap() + .with("#ps","PartitionSpec")); + + try{ + ddbTable.putItem(itemSpec); + System.out.println("Item was added to the table.PartitionSpec="+partition.spec()+"; Path="+partition.path()); + return true; + } + catch(ConditionalCheckFailedException e){ + System.out.println(e.toString()); + System.out.println("Item already exists. PartitionSpec="+partition.spec()+"; Path="+partition.path()); + return false; + } + } +} diff --git a/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/RemoveAthenaPartitions.java b/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/RemoveAthenaPartitions.java new file mode 100644 index 00000000..6d6724d5 --- /dev/null +++ b/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/RemoveAthenaPartitions.java @@ -0,0 +1,64 @@ +package com.amazonaws.services.lambda; + +import com.amazonaws.services.lambda.model.ExpirationConfig; +import com.amazonaws.services.lambda.model.PartitionConfig; +import com.amazonaws.services.lambda.model.TableService; +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestStreamHandler; +import com.amazonaws.services.lambda.utils.Clock; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collection; +import java.util.HashSet; + +public class RemoveAthenaPartitions implements RequestStreamHandler { + + private static final DateTimeFormatter DATE_TIME_PATTERN = DateTimeFormat.forPattern("yyyy-MM-dd-HH"); + + private final PartitionConfig partitionConfig; + private final ExpirationConfig expirationConfig; + private final TableService tableService; + private final Clock clock; + + public RemoveAthenaPartitions() { + this(PartitionConfig.fromEnv(), ExpirationConfig.fromEnv(), new TableService(), Clock.SystemClock); + } + + RemoveAthenaPartitions(PartitionConfig partitionConfig, ExpirationConfig expirationConfig, TableService tableService, Clock clock) { + this.partitionConfig = partitionConfig; + this.expirationConfig = expirationConfig; + this.tableService = tableService; + this.clock = clock; + } + + @Override + public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) throws IOException { + Collection partitionsToRemove = new HashSet<>(); + + DateTime expiryThreshold = partitionConfig.partitionType().roundDownTimeUnit(clock.now()) + .minus(expirationConfig.expiresAfterMillis()); + + Collection existingPartitions = tableService.getExistingPartitions(partitionConfig.tableName()); + + for (String existingPartition : existingPartitions) { + DateTime partitionDateTime = partitionConfig.partitionType().roundDownTimeUnit( + DateTime.parse(existingPartition, DATE_TIME_PATTERN)); + if (hasExpired(partitionDateTime, expiryThreshold)) { + partitionsToRemove.add(existingPartition); + } + } + + if (!partitionsToRemove.isEmpty()) { + tableService.removePartitions(partitionConfig.tableName(), partitionsToRemove); + } + } + + private boolean hasExpired(DateTime partitionDateTime, DateTime expiryThreshold) { + return partitionDateTime.isEqual(expiryThreshold) || partitionDateTime.isBefore(expiryThreshold); + } +} diff --git a/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/RemoveAthenaPartitionsBasedOnS3Event.java b/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/RemoveAthenaPartitionsBasedOnS3Event.java new file mode 100644 index 00000000..56855a41 --- /dev/null +++ b/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/RemoveAthenaPartitionsBasedOnS3Event.java @@ -0,0 +1,56 @@ +package com.amazonaws.services.lambda; + +import com.amazonaws.services.lambda.model.Partition; +import com.amazonaws.services.lambda.model.PartitionConfig; +import com.amazonaws.services.lambda.model.S3Object; +import com.amazonaws.services.lambda.model.TableService; +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.S3Event; +import com.amazonaws.services.s3.event.S3EventNotification; + +import java.util.Collection; +import java.util.HashSet; +import java.util.stream.Collectors; + +public class RemoveAthenaPartitionsBasedOnS3Event implements RequestHandler { + + private final PartitionConfig partitionConfig; + + public RemoveAthenaPartitionsBasedOnS3Event() { + this(PartitionConfig.fromEnv()); + } + + RemoveAthenaPartitionsBasedOnS3Event(PartitionConfig partitionConfig) { + this.partitionConfig = partitionConfig; + } + + @Override + public Void handleRequest(S3Event s3Event, Context context) { + + Collection partitionsToRemove = new HashSet<>(); + TableService tableService = new TableService(); + + for (S3EventNotification.S3EventNotificationRecord record : s3Event.getRecords()) { + + String bucket = record.getS3().getBucket().getName(); + String key = record.getS3().getObject().getKey(); + + System.out.printf("S3 event [Event: %s, Bucket: %s, Key: %s]%n", record.getEventName(), bucket, key); + + S3Object s3Object = new S3Object(bucket, key); + + if (s3Object.hasDateTimeKey()) { + partitionsToRemove.add(partitionConfig.createPartitionFor(s3Object)); + } + } + + if (!partitionsToRemove.isEmpty()) { + tableService.removePartitions( + partitionConfig.tableName(), + partitionsToRemove.stream().map(Partition::spec).collect(Collectors.toList())); + } + + return null; + } +} diff --git a/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/model/ExpirationConfig.java b/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/model/ExpirationConfig.java new file mode 100644 index 00000000..3fe19429 --- /dev/null +++ b/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/model/ExpirationConfig.java @@ -0,0 +1,27 @@ +package com.amazonaws.services.lambda.model; + + +import com.amazonaws.services.lambda.utils.EnvironmentVariableUtils; +import com.amazonaws.services.lambda.utils.TimeUnitParser; + +import java.util.concurrent.TimeUnit; + +public class ExpirationConfig { + + public static ExpirationConfig fromEnv() { + long expiresAfterMillis = TimeUnitParser.parse( + EnvironmentVariableUtils.getOptionalEnv("EXPIRES_AFTER", String.valueOf(TimeUnit.DAYS.toMillis(30))), + TimeUnit.MILLISECONDS); + return new ExpirationConfig(expiresAfterMillis); + } + + private final long expiresAfterMillis; + + public ExpirationConfig(long expiresAfterMillis) { + this.expiresAfterMillis = expiresAfterMillis; + } + + public long expiresAfterMillis() { + return expiresAfterMillis; + } +} diff --git a/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/model/Partition.java b/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/model/Partition.java new file mode 100644 index 00000000..1f2eac79 --- /dev/null +++ b/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/model/Partition.java @@ -0,0 +1,51 @@ +package com.amazonaws.services.lambda.model; + +public class Partition { + + public static String NAME = "IngestDateTime"; + + private final String name; + private final String spec; + private final String path; + + + + public Partition(String spec, String path) { + this(spec, path, NAME); + } + + public Partition(String spec, String path, String name){ + this.spec = spec; + this.path = path; + this.name = name; + } + + public String name() { return name; } + + public String spec() { + return spec; + } + + public String path() { + return path; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Partition partition = (Partition) o; + + if (!spec.equals(partition.spec)) return false; + return path.equals(partition.path); + + } + + @Override + public int hashCode() { + int result = spec.hashCode(); + result = 31 * result + path.hashCode(); + return result; + } +} diff --git a/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/model/PartitionConfig.java b/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/model/PartitionConfig.java new file mode 100644 index 00000000..3d5ed295 --- /dev/null +++ b/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/model/PartitionConfig.java @@ -0,0 +1,54 @@ +package com.amazonaws.services.lambda.model; + +import com.amazonaws.services.lambda.utils.EnvironmentVariableUtils; + +public class PartitionConfig { + + public static PartitionConfig fromEnv() { + PartitionType partitionType = PartitionType.parse( + EnvironmentVariableUtils.getOptionalEnv("PARTITION_TYPE", PartitionType.Day.name())); + String tableName = EnvironmentVariableUtils.getMandatoryEnv("TABLE_NAME"); + String dynamoDBTableName = EnvironmentVariableUtils.getOptionalEnv("DDB_TABLE_NAME", ""); + + return new PartitionConfig(partitionType, tableName, dynamoDBTableName); + } + + private final PartitionType partitionType; + private final String tableName; + private final String dynamoDBTableName; + + public PartitionConfig(PartitionType partitionType, String tableName) { + this(partitionType, tableName, ""); + } + + public PartitionConfig(PartitionType partitionType, String tableName, String dynamoDBTableName) { + this.partitionType = partitionType; + this.tableName = tableName; + this.dynamoDBTableName = dynamoDBTableName; + } + + public PartitionType partitionType() { + return partitionType; + } + + public String tableName() { + return tableName; + } + + public String dynamoDBTableName() { + return dynamoDBTableName; + } + + public Partition createPartitionFor(S3Object s3Object) { + return new Partition( + partitionType.createSpec(s3Object.parseDateTimeFromKey()), + partitionType.createPath(s3Object)); + } + + public Partition createPartitionWithPartitionTypeAsNameFor(S3Object s3Object) { + return new Partition( + partitionType.createSpec(s3Object.parseDateTimeFromKey()), + partitionType.createPath(s3Object), + partitionType.name()); + } +} diff --git a/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/model/PartitionType.java b/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/model/PartitionType.java new file mode 100644 index 00000000..3882f5f7 --- /dev/null +++ b/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/model/PartitionType.java @@ -0,0 +1,112 @@ +package com.amazonaws.services.lambda.model; + +import org.joda.time.DateTime; + +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public enum PartitionType { + ALL { + @Override + public String createSpec(DateTime dateTime) { + return String.format("Year='%04d',Month='%02d',Day='%02d',Hour='%02d'", + dateTime.getYear(), + dateTime.getMonthOfYear(), + dateTime.getDayOfMonth(), + dateTime.getHourOfDay()); + } + + @Override + public String createPath(S3Object s3Object) { + return createPartitionRoot(1, s3Object); + } + + @Override + public DateTime roundDownTimeUnit(DateTime value) { + return value.withMinuteOfHour(0) + .withSecondOfMinute(0) + .withMillisOfSecond(0); + } + }, + Month { + @Override + public String createSpec(DateTime dateTime) { + return String.format("%04d-%02d-01-00", dateTime.getYear(), dateTime.getMonthOfYear()); + } + + @Override + public String createPath(S3Object s3Object) { + return createPartitionRoot(3, s3Object); + } + + @Override + public DateTime roundDownTimeUnit(DateTime value) { + return value.withDayOfMonth(1) + .withHourOfDay(0) + .withMinuteOfHour(0) + .withSecondOfMinute(0) + .withMillisOfSecond(0); + } + }, + Day { + @Override + public String createSpec(DateTime dateTime) { + return String.format("%04d-%02d-%02d-00", dateTime.getYear(), dateTime.getMonthOfYear(), dateTime.getDayOfMonth()); + } + + @Override + public String createPath(S3Object s3Object) { + return createPartitionRoot(2, s3Object); + } + + @Override + public DateTime roundDownTimeUnit(DateTime value) { + return value.withHourOfDay(0) + .withMinuteOfHour(0) + .withSecondOfMinute(0) + .withMillisOfSecond(0); + } + }, + Hour { + @Override + public String createSpec(DateTime dateTime) { + return String.format("%04d-%02d-%02d-%02d", dateTime.getYear(), dateTime.getMonthOfYear(), dateTime.getDayOfMonth(), dateTime.getHourOfDay()); + } + + @Override + public String createPath(S3Object s3Object) { + return createPartitionRoot(1, s3Object); + } + + @Override + public DateTime roundDownTimeUnit(DateTime value) { + return value.withMinuteOfHour(0) + .withSecondOfMinute(0) + .withMillisOfSecond(0); + } + }; + + public static PartitionType parse(String value) { + for (PartitionType partitionType : PartitionType.values()) { + if (value.equalsIgnoreCase(partitionType.name())) { + return partitionType; + } + } + + throw new IllegalArgumentException("Unrecognized PartitionType value: " + value); + } + + public abstract String createSpec(org.joda.time.DateTime dateTime); + + public abstract String createPath(S3Object s3Object); + + public abstract DateTime roundDownTimeUnit(DateTime value); + + private static String createPartitionRoot(int endIndex, S3Object s3Object) { + String newKey = IntStream.range(0, s3Object.keyLength() - endIndex) + .mapToObj(s3Object::keyPart) + .filter(s -> !s.isEmpty()) + .collect(Collectors.joining("/")); + return String.format("s3://%s/%s/", s3Object.bucket(), newKey); + } +} \ No newline at end of file diff --git a/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/model/S3Object.java b/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/model/S3Object.java new file mode 100644 index 00000000..7f0b66b2 --- /dev/null +++ b/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/model/S3Object.java @@ -0,0 +1,67 @@ +package com.amazonaws.services.lambda.model; + +import org.joda.time.DateTime; + +public class S3Object { + + private final String bucket; + private final String key; + private final String[] keyParts; + private final DateTime dateTime; + + public S3Object(String bucket, String key) { + this.bucket = bucket; + this.key = key; + this.keyParts = key.split("/"); + this.dateTime = parseDateTime(); + } + + public String bucket() { + return bucket; + } + + public String key() { + return key; + } + + public String keyPart(int index){ + return keyParts[index]; + } + + public int keyLength(){ + return keyParts.length; + } + + public boolean hasDateTimeKey() { + return dateTime != null; + } + + DateTime parseDateTimeFromKey() { + return dateTime; + } + + private DateTime parseDateTime() { + try { + return new DateTime(year(), month(), day(), hour(), 0); + } catch (Exception e) { + return null; + } + } + + private int year() { + return Integer.parseInt(keyParts[keyParts.length - 5]); + } + + private int month() { + return Integer.parseInt(keyParts[keyParts.length - 4]); + } + + private int day() { + return Integer.parseInt(keyParts[keyParts.length - 3]); + } + + private int hour() { + return Integer.parseInt(keyParts[keyParts.length - 2]); + } +} + diff --git a/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/model/TableService.java b/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/model/TableService.java new file mode 100644 index 00000000..48b0efb3 --- /dev/null +++ b/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/model/TableService.java @@ -0,0 +1,99 @@ +package com.amazonaws.services.lambda.model; + +import com.amazonaws.services.lambda.utils.EnvironmentVariableUtils; + +import java.sql.*; +import java.util.Collection; +import java.util.HashSet; +import java.util.Properties; + + +public class TableService { + + public Collection getExistingPartitions(String tableName) { + + Collection existingPartitions = new HashSet<>(); + + try (Connection conn = AthenaConnection.getConnection(); Statement statement = conn.createStatement()) { + + String sql = String.format("SHOW PARTITIONS %s", tableName); + try (ResultSet rs = statement.executeQuery(sql)) { + while (rs.next()) { + existingPartitions.add( + rs.getString(1).toUpperCase().replace(String.format("%S=", Partition.NAME), "")); + } + } + + } catch (Exception ex) { + throw new IllegalStateException("An error occurred while getting existing partitions.", ex); + } + + return existingPartitions; + } + + public void addPartitions(String tableName, Collection partitions){ + addPartitions(tableName, partitions, false); + } + + public void addPartitions(String tableName, Collection partitions, boolean multiplePartition) { + + try (Connection conn = AthenaConnection.getConnection(); Statement statement = conn.createStatement()) { + for (Partition partition : partitions) { + String sql = ""; + + if(multiplePartition) { + sql = String.format("ALTER TABLE %s ADD IF NOT EXISTS PARTITION (%s) LOCATION '%s'", + tableName, partition.spec(), partition.path()); + } else { + sql = String.format("ALTER TABLE %s ADD IF NOT EXISTS PARTITION (%s='%s') LOCATION '%s'", + tableName, partition.name(), partition.spec(), partition.path()); + } + + + System.out.printf("SQL: %s%n", sql); + statement.execute(sql); + System.out.printf("New partition [Spec: %s, Path: %s]%n", partition.spec(), partition.path()); + } + } catch (Exception ex) { + throw new IllegalStateException("An error occurred while adding partitions.", ex); + } + } + + public void removePartitions(String tableName, Collection partitionSpecs) { + + try (Connection conn = AthenaConnection.getConnection(); Statement statement = conn.createStatement()) { + for (String partitionSpec : partitionSpecs) { + String sql = String.format("ALTER TABLE %s DROP IF EXISTS PARTITION (%s='%s')", + tableName, Partition.NAME, partitionSpec); + System.out.printf("SQL: %s%n", sql); + statement.execute(sql); + System.out.printf("Removed partition [Spec: %s]%n", partitionSpec); + } + } catch (Exception ex) { + throw new IllegalStateException("An error occurred while removing partitions.", ex); + } + } + + private static class AthenaConnection { + + private static final String JDBC_DRIVER = "com.amazonaws.athena.jdbc.AthenaDriver"; + private static final String DATABASE_URL_TEMPLATE = "jdbc:awsathena://athena.%s.amazonaws.com:443"; + private static final String CREDENTIALS_PROVIDER = "com.amazonaws.auth.DefaultAWSCredentialsProviderChain"; + + static Connection getConnection() throws ClassNotFoundException, SQLException { + + Class.forName(JDBC_DRIVER); + + String s3StagingDir = EnvironmentVariableUtils.getMandatoryEnv("S3_STAGING_DIR"); + String region = EnvironmentVariableUtils.getOptionalEnv("ATHENA_REGION", EnvironmentVariableUtils.getMandatoryEnv(("AWS_DEFAULT_REGION"))); + + String databaseUrl = String.format(DATABASE_URL_TEMPLATE, region); + + Properties properties = new Properties(); + properties.put("s3_staging_dir", s3StagingDir); + properties.put("aws_credentials_provider_class", CREDENTIALS_PROVIDER); + + return DriverManager.getConnection(databaseUrl, properties); + } + } +} diff --git a/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/utils/Clock.java b/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/utils/Clock.java new file mode 100644 index 00000000..87a5fafc --- /dev/null +++ b/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/utils/Clock.java @@ -0,0 +1,15 @@ +package com.amazonaws.services.lambda.utils; + +import org.joda.time.DateTime; + +public interface Clock { + + public static Clock SystemClock = new Clock() { + @Override + public DateTime now() { + return DateTime.now(); + } + }; + + DateTime now(); +} diff --git a/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/utils/EnvironmentVariableUtils.java b/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/utils/EnvironmentVariableUtils.java new file mode 100644 index 00000000..a4d3b1f9 --- /dev/null +++ b/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/utils/EnvironmentVariableUtils.java @@ -0,0 +1,20 @@ +package com.amazonaws.services.lambda.utils; + +import com.amazonaws.athena.jdbc.shaded.guava.base.Strings; + +public class EnvironmentVariableUtils { + public static String getMandatoryEnv(String name) { + if (Strings.isNullOrEmpty(System.getenv(name))) { + + throw new IllegalStateException(String.format("Missing environment variable: %s", name)); + } + return System.getenv(name); + } + + public static String getOptionalEnv(String name, String defaultValue){ + if (Strings.isNullOrEmpty(System.getenv(name))) { + return defaultValue; + } + return System.getenv(name); + } +} diff --git a/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/utils/TimeUnitParser.java b/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/utils/TimeUnitParser.java new file mode 100644 index 00000000..6f59ec1d --- /dev/null +++ b/aws-blog-serverless-cf-analysis/src/main/java/com/amazonaws/services/lambda/utils/TimeUnitParser.java @@ -0,0 +1,27 @@ +package com.amazonaws.services.lambda.utils; + +import org.joda.time.format.PeriodFormatter; +import org.joda.time.format.PeriodFormatterBuilder; + +import java.util.concurrent.TimeUnit; + + +public class TimeUnitParser { + private static final PeriodFormatter PARSER = new PeriodFormatterBuilder() + .appendMillis().appendSuffix("ms").appendSeparatorIfFieldsBefore(" ") + .appendMinutes().appendSuffix("min").appendSeparatorIfFieldsBefore(" ") + .appendSeconds().appendSuffix("s").appendSeparatorIfFieldsBefore(" ") + .appendHours().appendSuffix("h", "hr").appendSeparatorIfFieldsBefore(" ") + .appendDays().appendSuffix("d").appendSeparatorIfFieldsBefore(" ") + .toFormatter(); + + public static long parse(String value, TimeUnit timeUnit) { + try { + return timeUnit.convert(Long.parseLong(value), TimeUnit.MILLISECONDS); + } catch (NumberFormatException e) { + long millis = PARSER.parsePeriod(value.replace(" ", "")).toStandardDuration().getMillis(); + return timeUnit.convert(millis, TimeUnit.MILLISECONDS); + } + } +} + diff --git a/aws-blog-serverless-cf-analysis/src/main/main.iml b/aws-blog-serverless-cf-analysis/src/main/main.iml new file mode 100644 index 00000000..55e41d75 --- /dev/null +++ b/aws-blog-serverless-cf-analysis/src/main/main.iml @@ -0,0 +1,12 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/aws-blog-serverless-cf-analysis/src/test/java/com/amazonaws/services/lambda/RemoveAthenaPartitionsTest.java b/aws-blog-serverless-cf-analysis/src/test/java/com/amazonaws/services/lambda/RemoveAthenaPartitionsTest.java new file mode 100644 index 00000000..2818db44 --- /dev/null +++ b/aws-blog-serverless-cf-analysis/src/test/java/com/amazonaws/services/lambda/RemoveAthenaPartitionsTest.java @@ -0,0 +1,117 @@ +package com.amazonaws.services.lambda; + +import com.amazonaws.services.lambda.model.ExpirationConfig; +import com.amazonaws.services.lambda.model.PartitionConfig; +import com.amazonaws.services.lambda.model.PartitionType; +import com.amazonaws.services.lambda.model.TableService; +import com.amazonaws.services.lambda.utils.Clock; +import com.amazonaws.services.lambda.utils.TimeUnitParser; +import org.joda.time.DateTime; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.concurrent.TimeUnit; + +import static java.util.Arrays.asList; +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + +public class RemoveAthenaPartitionsTest { + + + @Test + public void shouldAskTableServiceToRemoveExpiredPartitionsForMonthBasedPartitions() throws IOException { + + String tableName = "vpc_flow_logs"; + PartitionConfig partitionConfig = new PartitionConfig(PartitionType.Month, tableName); + ExpirationConfig expirationConfig = new ExpirationConfig(TimeUnit.DAYS.toMillis(50)); + TableService tableService = mock(TableService.class); + Clock clock = () -> new DateTime(2017, 1, 15, 13, 45, 10, 555); + + when (tableService.getExistingPartitions(tableName)).thenReturn(asList( + "2017-01-01-00", + "2016-12-01-00", + "2016-11-01-00", + "2016-10-01-00", + "2016-09-01-00", + "2016-08-01-00", + "2016-07-01-00", + "2016-06-01-00" + + )); + + RemoveAthenaPartitions lambda = new RemoveAthenaPartitions(partitionConfig, expirationConfig, tableService, clock); + lambda.handleRequest(null, null, null); + + verify(tableService).removePartitions(tableName, new HashSet<>( asList( + "2016-11-01-00", + "2016-10-01-00", + "2016-09-01-00", + "2016-08-01-00", + "2016-07-01-00", + "2016-06-01-00"))); + } + + @Test + public void shouldAskTableServiceToRemoveExpiredPartitionsForDayBasedPartitions() throws IOException { + + String tableName = "vpc_flow_logs"; + PartitionConfig partitionConfig = new PartitionConfig(PartitionType.Day, tableName); + ExpirationConfig expirationConfig = new ExpirationConfig(TimeUnit.DAYS.toMillis(5)); + TableService tableService = mock(TableService.class); + Clock clock = () -> new DateTime(2017, 1, 15, 13, 45, 10, 555); + + when (tableService.getExistingPartitions(tableName)).thenReturn(asList( + "2017-01-15-00", + "2017-01-14-00", + "2017-01-13-00", + "2017-01-12-00", + "2017-01-11-00", + "2017-01-10-00", + "2017-01-09-00", + "2017-01-08-00" + )); + + RemoveAthenaPartitions lambda = new RemoveAthenaPartitions(partitionConfig, expirationConfig, tableService, clock); + lambda.handleRequest(null, null, null); + + verify(tableService).removePartitions(tableName, new HashSet<>( asList( + "2017-01-10-00", + "2017-01-09-00", + "2017-01-08-00"))); + } + + @Test + public void shouldAskTableServiceToRemoveExpiredPartitionsForHourBasedPartitions() throws IOException { + + String tableName = "vpc_flow_logs"; + PartitionConfig partitionConfig = new PartitionConfig(PartitionType.Hour, tableName); + ExpirationConfig expirationConfig = new ExpirationConfig(TimeUnit.HOURS.toMillis(5)); + TableService tableService = mock(TableService.class); + Clock clock = () -> new DateTime(2017, 1, 15, 13, 45, 10, 555); + + when (tableService.getExistingPartitions(tableName)).thenReturn(asList( + "2017-01-15-13", + "2017-01-15-12", + "2017-01-15-12", + "2017-01-15-10", + "2017-01-15-09", + "2017-01-15-08", + "2017-01-15-07", + "2017-01-15-06" + )); + + RemoveAthenaPartitions lambda = new RemoveAthenaPartitions(partitionConfig, expirationConfig, tableService, clock); + lambda.handleRequest(null, null, null); + + verify(tableService).removePartitions(tableName, new HashSet<>( asList( + "2017-01-15-08", + "2017-01-15-07", + "2017-01-15-06"))); + } +} \ No newline at end of file diff --git a/aws-blog-serverless-cf-analysis/src/test/java/com/amazonaws/services/lambda/model/PartitionTypeTest.java b/aws-blog-serverless-cf-analysis/src/test/java/com/amazonaws/services/lambda/model/PartitionTypeTest.java new file mode 100644 index 00000000..c20200e2 --- /dev/null +++ b/aws-blog-serverless-cf-analysis/src/test/java/com/amazonaws/services/lambda/model/PartitionTypeTest.java @@ -0,0 +1,30 @@ +package com.amazonaws.services.lambda.model; + +import org.joda.time.DateTime; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class PartitionTypeTest { + + @Test + public void shouldFormatPartitionRoot() { + assertEquals("s3://my-root/prefix/2016/12/", + PartitionType.Month.createPath(new S3Object("my-root", "/prefix/2016/12/15/03/some-file.txt"))); + + assertEquals("s3://my-root/prefix/2016/12/15/", + PartitionType.Day.createPath(new S3Object("my-root", "/prefix/2016/12/15/03/some-file.txt"))); + + assertEquals("s3://my-root/prefix/2016/12/15/03/", + PartitionType.Hour.createPath(new S3Object("my-root", "/prefix/2016/12/15/03/some-file.txt"))); + } + + @Test + public void shouldRoundDownDateTime() { + DateTime initialValue = new DateTime(2016, 11, 28, 16, 53, 15, 555); + + assertEquals(new DateTime(2016, 11, 1, 0, 0), PartitionType.Month.roundDownTimeUnit(initialValue)); + assertEquals(new DateTime(2016, 11, 28, 0, 0), PartitionType.Day.roundDownTimeUnit(initialValue)); + assertEquals(new DateTime(2016, 11, 28, 16, 0), PartitionType.Hour.roundDownTimeUnit(initialValue)); + } +} \ No newline at end of file diff --git a/aws-blog-serverless-cf-analysis/src/test/java/com/amazonaws/services/lambda/model/S3ObjectTest.java b/aws-blog-serverless-cf-analysis/src/test/java/com/amazonaws/services/lambda/model/S3ObjectTest.java new file mode 100644 index 00000000..4ad881dc --- /dev/null +++ b/aws-blog-serverless-cf-analysis/src/test/java/com/amazonaws/services/lambda/model/S3ObjectTest.java @@ -0,0 +1,26 @@ +package com.amazonaws.services.lambda.model; + +import org.joda.time.DateTime; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + + +public class S3ObjectTest { + @Test + public void shouldParseDateFromKey() { + String key = "/prefix/2016/12/15/03/some-file.txt"; + + S3Object s3Object = new S3Object("my-bucket", key); + assertEquals(new DateTime(2016, 12, 15, 3, 0), s3Object.parseDateTimeFromKey()); + } + + @Test + public void shouldIndicateWhetherKeyContainsDateTimePart() { + String key = "/prefix/a/b/some-file.txt"; + + S3Object s3Object = new S3Object("my-bucket", key); + assertFalse(s3Object.hasDateTimeKey()); + } +} \ No newline at end of file diff --git a/aws-blog-serverless-cf-analysis/src/test/java/com/amazonaws/services/lambda/utils/TimeUnitParserTest.java b/aws-blog-serverless-cf-analysis/src/test/java/com/amazonaws/services/lambda/utils/TimeUnitParserTest.java new file mode 100644 index 00000000..dbcbd879 --- /dev/null +++ b/aws-blog-serverless-cf-analysis/src/test/java/com/amazonaws/services/lambda/utils/TimeUnitParserTest.java @@ -0,0 +1,39 @@ +package com.amazonaws.services.lambda.utils; + +import org.junit.Test; + +import java.text.ParseException; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + + +public class TimeUnitParserTest { + + @Test + public void shouldParseMillisFromString() throws ParseException + { + assertEquals( 10, TimeUnitParser.parse( "10ms", TimeUnit.MILLISECONDS ) ); + assertEquals( 10000, TimeUnitParser.parse( "10s", TimeUnit.MILLISECONDS ) ); + assertEquals( 60000, TimeUnitParser.parse( "1min", TimeUnit.MILLISECONDS ) ); + assertEquals( 3600000, TimeUnitParser.parse( "1h", TimeUnit.MILLISECONDS ) ); + assertEquals( 3600000, TimeUnitParser.parse( "1hr", TimeUnit.MILLISECONDS ) ); + } + + @Test + public void shouldInterpretValueWithoutSuffixAsMillis() + { + assertEquals( 10, TimeUnitParser.parse( "10", TimeUnit.MILLISECONDS ) ); + assertEquals( 10, TimeUnitParser.parse( "10000", TimeUnit.SECONDS ) ); + } + + @Test + public void shouldParseSecondsFromString() throws ParseException + { + assertEquals( 0, TimeUnitParser.parse( "10ms", TimeUnit.SECONDS ) ); + assertEquals( 10, TimeUnitParser.parse( "10s", TimeUnit.SECONDS ) ); + assertEquals( 60, TimeUnitParser.parse( "1min", TimeUnit.SECONDS ) ); + assertEquals( 3600, TimeUnitParser.parse( "1h", TimeUnit.SECONDS ) ); + assertEquals( 3600, TimeUnitParser.parse( "1hr", TimeUnit.SECONDS ) ); + } +} \ No newline at end of file diff --git a/aws-blog-serverless-cf-analysis/src/test/test.iml b/aws-blog-serverless-cf-analysis/src/test/test.iml new file mode 100644 index 00000000..a0e49a3b --- /dev/null +++ b/aws-blog-serverless-cf-analysis/src/test/test.iml @@ -0,0 +1,11 @@ + + + + + + + + + + + \ No newline at end of file