-
Notifications
You must be signed in to change notification settings - Fork 0
/
GDCReplicationPlanner.java
138 lines (121 loc) · 5.67 KB
/
GDCReplicationPlanner.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT-0
package com.amazonaws.gdcreplication.lambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.StringTokenizer;
import java.util.stream.Collectors;
import com.amazonaws.gdcreplication.util.DDBUtil;
import com.amazonaws.gdcreplication.util.GlueUtil;
import com.amazonaws.gdcreplication.util.SNSUtil;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.glue.AWSGlue;
import com.amazonaws.services.glue.AWSGlueClientBuilder;
import com.amazonaws.services.glue.model.Database;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.AmazonSNSClientBuilder;
/**
* This class has AWS Lambda Handler method. Upon invocation, it fetches all the
* databases form Glue Catalog, for each database, it takes the following
* actions:
* 1. Convert Glue Database object to JSON String (This is Database DDL)
* 2. Publish the Database DDL to an SNS Topic
* 3. Insert a record to a DynamoDB table for status tracking
*
* @author Ravi Itha, Amazon Web Services, Inc.
*
*/
public class GDCReplicationPlanner implements RequestHandler<Object, String> {
@Override
public String handleRequest(Object input, Context context) {
context.getLogger().log("Input: " + input);
String region = Optional.ofNullable(System.getenv("region")).orElse(Regions.US_EAST_1.getName());
String sourceGlueCatalogId = Optional.ofNullable(System.getenv("source_glue_catalog_id")).orElse("1234567890");
String dbPrefixString = Optional.ofNullable(System.getenv("database_prefix_list")).orElse("");
String separator = Optional.ofNullable(System.getenv("separator")).orElse("|");
String topicArn = Optional.ofNullable(System.getenv("sns_topic_arn_gdc_replication_planner"))
.orElse("arn:aws:sns:us-east-1:1234567890:GlueExportSNSTopic");
String ddbTblNameForDBStatusTracking = Optional.ofNullable(System.getenv("ddb_name_gdc_replication_planner"))
.orElse("ddb_name_gdc_replication_planner");
// Print environment variables
printEnvVariables(sourceGlueCatalogId, topicArn, ddbTblNameForDBStatusTracking, dbPrefixString, separator);
// Create Objects for Glue and SQS
AWSGlue glue = AWSGlueClientBuilder.standard().withRegion(region).build();
AmazonSNS sns = AmazonSNSClientBuilder.standard().withRegion(region).build();
// Create Objects for Utility classes
DDBUtil ddbUtil = new DDBUtil();
SNSUtil snsUtil = new SNSUtil();
GlueUtil glueUtil = new GlueUtil();
// Get databases from Glue
int numberOfDatabasesExported = 0;
List<Database> dBList = glueUtil.getDatabases(glue, sourceGlueCatalogId);
// When database Prefix string is empty or not provided then, it imports all databases
// else, it imports only the databases that has the same prefix
if (dbPrefixString.equalsIgnoreCase("")) {
numberOfDatabasesExported = snsUtil.publishDatabaseSchemasToSNS(sns, dBList, topicArn, ddbUtil,
ddbTblNameForDBStatusTracking, sourceGlueCatalogId);
} else {
// Tokenize the database prefix string to a List of database prefixes
List<String> dbPrefixList = tokenizeDatabasePrefixString(dbPrefixString, separator);
// Identify required databases to export
List<Database> dBsListToExport = getRequiredDatabases(dBList, dbPrefixList);
// Publish schemas for databases to SNS Topic
numberOfDatabasesExported = snsUtil.publishDatabaseSchemasToSNS(sns, dBsListToExport, topicArn, ddbUtil,
ddbTblNameForDBStatusTracking, sourceGlueCatalogId);
}
System.out.printf(
"Database export statistics: number of databases exist = %d, number of databases exported to SNS = %d. \n",
dBList.size(), numberOfDatabasesExported);
return "Lambda function to get a list of Databases completed successfully!";
}
/**
* This method prints environment variables
* @param sourceGlueCatalogId
* @param topicArn
* @param ddbTblNameForDBStatusTracking
*/
public static void printEnvVariables(String sourceGlueCatalogId, String topicArn,
String ddbTblNameForDBStatusTracking, String dbPrefixString, String separator) {
System.out.println("SNS Topic Arn: " + topicArn);
System.out.println("Source Catalog Id: " + sourceGlueCatalogId);
System.out.println("Database Prefix String: " + dbPrefixString);
System.out.println("Prefix Separator: " + separator);
System.out.println("DynamoDB Table to track GDC Replication Planning: " + ddbTblNameForDBStatusTracking);
}
/**
* Tokenize the Data Prefix String to a List of Prefixes
* @param dbPrefixString
* @param token
* @return
*/
public static List<String> tokenizeDatabasePrefixString(String str, String separator) {
List<String> dbPrefixesList = Collections.list(new StringTokenizer(str, separator)).stream()
.map(token -> (String) token)
.collect(Collectors.toList());
System.out.println("Number of database prefixes: " + dbPrefixesList.size());
return dbPrefixesList;
}
/**
*
* @param dBList
* @param requiredDBPrefixList
* @return
*/
public static List<Database> getRequiredDatabases(List<Database> dBList, List<String> dbPrefixesList){
List<Database> dBsToExportList = new ArrayList<Database>();
for(Database database : dBList) {
for(String dbPrefix : dbPrefixesList) {
if(database.getName().toLowerCase().startsWith(dbPrefix)) {
dBsToExportList.add(database);
break;
}
}
}
System.out.printf("Number of databases in Glue Catalog: %d, number of databases to be exported: %d \n", dBList.size(), dBsToExportList.size());
return dBsToExportList;
}
}