diff --git a/pom.xml b/pom.xml
index 28cceb6..ff6824a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,8 +50,8 @@
1.18.1
provided
- 2.15.3-18.0
32.1.3-jre-18.0
+ 32.1.3-jre
1.18.30
1.0.12
3.5.4
@@ -145,16 +145,9 @@
- org.apache.flink
- flink-shaded-jackson
- ${flink.shaded.jackson.version}
- ${flink.scope}
-
-
- org.apache.flink
- flink-shaded-guava
- ${flink.shaded.guava.version}
- ${flink.scope}
+ com.google.guava
+ guava
+ ${guava.version}
diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/SakuraCatalog.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/SakuraCatalog.java
index 8b029b4..b44cef1 100644
--- a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/SakuraCatalog.java
+++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/SakuraCatalog.java
@@ -27,6 +27,7 @@
import cn.sliew.sakura.catalog.service.dto.CatalogFunctionDTO;
import cn.sliew.sakura.catalog.service.dto.CatalogTableDTO;
import cn.sliew.sakura.catalog.service.impl.CatalogServiceImpl;
+import cn.sliew.sakura.common.dict.catalog.CatalogType;
import cn.sliew.sakura.dao.util.MybatisUtil;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.flink.table.catalog.*;
@@ -73,19 +74,19 @@ public void close() throws CatalogException {
@Override
public List listDatabases() throws CatalogException {
- return catalogService.listDatabases(getName()).stream().map(CatalogDatabaseDTO::getName).collect(Collectors.toList());
+ return catalogService.listDatabases(CatalogType.FLINK, getName()).stream().map(CatalogDatabaseDTO::getName).collect(Collectors.toList());
}
@Override
public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
- Optional optional = catalogService.getDatabase(getName(), databaseName);
+ Optional optional = catalogService.getDatabase(CatalogType.FLINK, getName(), databaseName);
return optional.map(CatalogDatabaseFactory::toDatabase)
.orElseThrow(() -> new DatabaseNotExistException(getName(), databaseName));
}
@Override
public boolean databaseExists(String databaseName) throws CatalogException {
- return catalogService.databaseExists(getName(), databaseName);
+ return catalogService.databaseExists(CatalogType.FLINK, getName(), databaseName);
}
@Override
@@ -102,7 +103,7 @@ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade
}
}
- if (!cascade && !catalogService.isDatabaseEmpty(getName(), name)) {
+ if (!cascade && !catalogService.isDatabaseEmpty(CatalogType.FLINK, getName(), name)) {
throw new DatabaseNotEmptyException(getName(), name);
}
@@ -113,7 +114,7 @@ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade
} catch (TableNotExistException ignored) {
}
});
- catalogService.deleteDatabase(getName(), name);
+ catalogService.deleteDatabase(CatalogType.FLINK, getName(), name);
} catch (DatabaseNotExistException e) {
if (!ignoreIfNotExists) {
throw e;
@@ -132,7 +133,7 @@ public List listTables(String databaseName) throws DatabaseNotExistExcep
if (databaseExists(databaseName) == false) {
throw new DatabaseNotExistException(getName(), databaseName);
}
- return catalogService.listTables(getName(), databaseName).stream().map(CatalogTableDTO::getName).collect(Collectors.toList());
+ return catalogService.listTables(CatalogType.FLINK, getName(), databaseName).stream().map(CatalogTableDTO::getName).collect(Collectors.toList());
}
@Override
@@ -140,15 +141,15 @@ public List listViews(String databaseName) throws DatabaseNotExistExcept
if (databaseExists(databaseName) == false) {
throw new DatabaseNotExistException(getName(), databaseName);
}
- return catalogService.listViews(getName(), databaseName).stream().map(CatalogTableDTO::getName).collect(Collectors.toList());
+ return catalogService.listViews(CatalogType.FLINK, getName(), databaseName).stream().map(CatalogTableDTO::getName).collect(Collectors.toList());
}
@Override
public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
String database = tablePath.getDatabaseName();
String table = tablePath.getObjectName();
- Optional tableOptional = catalogService.getTable(getName(), database, table).map(CatalogTableFactory::toTable);
- Optional viewOptional = catalogService.getView(getName(), database, table).map(CatalogViewFactory::toView);
+ Optional tableOptional = catalogService.getTable(CatalogType.FLINK, getName(), database, table).map(CatalogTableFactory::toTable);
+ Optional viewOptional = catalogService.getView(CatalogType.FLINK, getName(), database, table).map(CatalogViewFactory::toView);
return tableOptional.or(() -> viewOptional).orElseThrow(() -> new TableNotExistException(getName(), tablePath));
}
@@ -156,8 +157,8 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
public boolean tableExists(ObjectPath tablePath) throws CatalogException {
String database = tablePath.getDatabaseName();
String table = tablePath.getObjectName();
- boolean tableExists = catalogService.tableExists(getName(), database, table);
- boolean viewExists = catalogService.viewExists(getName(), database, table);
+ boolean tableExists = catalogService.tableExists(CatalogType.FLINK, getName(), database, table);
+ boolean viewExists = catalogService.viewExists(CatalogType.FLINK, getName(), database, table);
return tableExists || viewExists;
}
@@ -170,10 +171,10 @@ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws Ta
CatalogBaseTable object = getTable(tablePath);
switch (object.getTableKind()) {
case TABLE:
- catalogService.deleteTable(getName(), database, table);
+ catalogService.deleteTable(CatalogType.FLINK, getName(), database, table);
break;
case VIEW:
- catalogService.deleteView(getName(), database, table);
+ catalogService.deleteView(CatalogType.FLINK, getName(), database, table);
break;
default:
throw new IllegalArgumentException("Unknown table type: " + object.getTableKind());
@@ -194,10 +195,10 @@ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignor
CatalogBaseTable object = getTable(tablePath);
switch (object.getTableKind()) {
case TABLE:
- catalogService.renameTable(getName(), database, table, newTableName);
+ catalogService.renameTable(CatalogType.FLINK, getName(), database, table, newTableName);
break;
case VIEW:
- catalogService.renameView(getName(), database, table, newTableName);
+ catalogService.renameView(CatalogType.FLINK, getName(), database, table, newTableName);
break;
default:
throw new IllegalArgumentException("Unknown table type: " + object.getTableKind());
@@ -218,7 +219,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable catalogBaseTable,
CatalogTable catalogTable = (CatalogTable) catalogBaseTable;
CatalogTableDTO catalogTableDTO = CatalogTableFactory.fromResolvedTable(table, catalogTable);
try {
- catalogService.insertTable(getName(), database, catalogTableDTO);
+ catalogService.insertTable(CatalogType.FLINK, getName(), database, catalogTableDTO);
break;
} catch (TableAlreadyExistException e) {
if (!ignoreIfExists) {
@@ -229,7 +230,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable catalogBaseTable,
ResolvedCatalogView catalogView = (ResolvedCatalogView) catalogBaseTable;
CatalogTableDTO catalogViewDTO = CatalogViewFactory.fromResolvedView(table, catalogView);
try {
- catalogService.insertView(getName(), database, catalogViewDTO);
+ catalogService.insertView(CatalogType.FLINK, getName(), database, catalogViewDTO);
} catch (TableAlreadyExistException e) {
if (!ignoreIfExists) {
throw e;
@@ -256,12 +257,12 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean
case TABLE:
CatalogTable catalogTable = (CatalogTable) newTable;
CatalogTableDTO catalogTableDTO = CatalogTableFactory.fromResolvedTable(table, catalogTable);
- catalogService.updateTable(getName(), database, catalogTableDTO);
+ catalogService.updateTable(CatalogType.FLINK, getName(), database, catalogTableDTO);
break;
case VIEW:
ResolvedCatalogView catalogView = (ResolvedCatalogView) newTable;
CatalogTableDTO catalogViewDTO = CatalogViewFactory.fromResolvedView(table, catalogView);
- catalogService.updateView(getName(), database, catalogViewDTO);
+ catalogService.updateView(CatalogType.FLINK, getName(), database, catalogViewDTO);
break;
default:
throw new IllegalArgumentException("Unknown table type: " + currentTable.getTableKind());
@@ -330,14 +331,14 @@ public List listFunctions(String dbName) throws DatabaseNotExistExceptio
if (databaseExists(dbName) == false) {
throw new DatabaseNotExistException(getName(), dbName);
}
- return catalogService.listFunctions(getName(), dbName).stream().map(CatalogFunctionDTO::getName).collect(Collectors.toList());
+ return catalogService.listFunctions(CatalogType.FLINK, getName(), dbName).stream().map(CatalogFunctionDTO::getName).collect(Collectors.toList());
}
@Override
public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
String database = functionPath.getDatabaseName();
String function = functionPath.getObjectName();
- return catalogService.getFunction(getName(), database, function)
+ return catalogService.getFunction(CatalogType.FLINK, getName(), database, function)
.map(CatalogFunctionFactory::toCatalogFunction)
.orElseThrow(() -> new FunctionNotExistException(getName(), functionPath));
}
@@ -346,7 +347,7 @@ public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotEx
public boolean functionExists(ObjectPath functionPath) throws CatalogException {
String database = functionPath.getDatabaseName();
String function = functionPath.getObjectName();
- return catalogService.functionExists(getName(), database, function);
+ return catalogService.functionExists(CatalogType.FLINK, getName(), database, function);
}
@Override
@@ -365,7 +366,7 @@ public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) thr
String function = functionPath.getObjectName();
try {
- catalogService.deleteFunction(getName(), database, function);
+ catalogService.deleteFunction(CatalogType.FLINK, getName(), database, function);
} catch (FunctionNotExistException e) {
if (!ignoreIfNotExists) {
throw e;
diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/factory/CatalogFunctionFactory.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/factory/CatalogFunctionFactory.java
index e452c7c..7350490 100644
--- a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/factory/CatalogFunctionFactory.java
+++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/factory/CatalogFunctionFactory.java
@@ -19,7 +19,7 @@
package cn.sliew.sakura.catalog.factory;
import cn.sliew.sakura.catalog.service.dto.CatalogFunctionDTO;
-import cn.sliew.sakura.common.dict.CatalogFunctionLanguage;
+import cn.sliew.sakura.common.dict.catalog.flink.CatalogFunctionLanguage;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.FunctionLanguage;
diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/factory/CatalogSchemaFactory.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/factory/CatalogSchemaFactory.java
index 1da0322..4dbc10b 100644
--- a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/factory/CatalogSchemaFactory.java
+++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/factory/CatalogSchemaFactory.java
@@ -22,8 +22,8 @@
import cn.sliew.sakura.catalog.service.dto.SchemaDTO;
import cn.sliew.sakura.catalog.service.dto.UniqueConstraintDTO;
import cn.sliew.sakura.catalog.service.dto.WatermarkDTO;
-import cn.sliew.sakura.common.dict.CatalogColumnType;
-import cn.sliew.sakura.common.dict.CatalogConstraintType;
+import cn.sliew.sakura.common.dict.catalog.flink.CatalogColumnType;
+import cn.sliew.sakura.common.dict.catalog.flink.CatalogConstraintType;
import cn.sliew.sakura.common.exception.Rethrower;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableException;
diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/factory/CatalogTableFactory.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/factory/CatalogTableFactory.java
index c549c48..1835240 100644
--- a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/factory/CatalogTableFactory.java
+++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/factory/CatalogTableFactory.java
@@ -20,7 +20,7 @@
import cn.sliew.sakura.catalog.SakuraCatalogTable;
import cn.sliew.sakura.catalog.service.dto.CatalogTableDTO;
-import cn.sliew.sakura.common.dict.CatalogTableKind;
+import cn.sliew.sakura.common.dict.catalog.flink.CatalogTableKind;
import org.apache.commons.lang3.EnumUtils;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogBaseTable;
diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/factory/CatalogViewFactory.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/factory/CatalogViewFactory.java
index 9060e89..fbac5b5 100644
--- a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/factory/CatalogViewFactory.java
+++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/factory/CatalogViewFactory.java
@@ -20,7 +20,7 @@
import cn.sliew.sakura.catalog.SakuraCatalogView;
import cn.sliew.sakura.catalog.service.dto.CatalogTableDTO;
-import cn.sliew.sakura.common.dict.CatalogTableKind;
+import cn.sliew.sakura.common.dict.catalog.flink.CatalogTableKind;
import org.apache.commons.lang3.EnumUtils;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogBaseTable;
diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/CatalogService.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/CatalogService.java
index 496af96..f519ed6 100644
--- a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/CatalogService.java
+++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/CatalogService.java
@@ -21,6 +21,7 @@
import cn.sliew.sakura.catalog.service.dto.CatalogDatabaseDTO;
import cn.sliew.sakura.catalog.service.dto.CatalogFunctionDTO;
import cn.sliew.sakura.catalog.service.dto.CatalogTableDTO;
+import cn.sliew.sakura.common.dict.catalog.CatalogType;
import org.apache.flink.table.catalog.exceptions.*;
import java.util.List;
@@ -28,57 +29,57 @@
public interface CatalogService {
- List listDatabases(String catalog);
+ List listDatabases(CatalogType type, String catalog);
- Optional getDatabase(String catalog, String database);
+ Optional getDatabase(CatalogType type, String catalog, String database);
- boolean databaseExists(String catalog, String database);
+ boolean databaseExists(CatalogType type, String catalog, String database);
void insertDatabase(CatalogDatabaseDTO database) throws DatabaseAlreadyExistException;
void updateDatabase(CatalogDatabaseDTO database) throws DatabaseNotExistException;
- void deleteDatabase(String catalog, String database) throws DatabaseNotExistException;
+ void deleteDatabase(CatalogType type, String catalog, String database) throws DatabaseNotExistException;
- boolean isDatabaseEmpty(String catalog, String database);
+ boolean isDatabaseEmpty(CatalogType type, String catalog, String database);
- List listTables(String catalog, String database);
+ List listTables(CatalogType type, String catalog, String database);
- Optional getTable(String catalog, String database, String table);
+ Optional getTable(CatalogType type, String catalog, String database, String table);
- boolean tableExists(String catalog, String database, String table);
+ boolean tableExists(CatalogType type, String catalog, String database, String table);
- void insertTable(String catalog, String database, CatalogTableDTO table) throws DatabaseNotExistException, TableAlreadyExistException;
+ void insertTable(CatalogType type, String catalog, String database, CatalogTableDTO table) throws DatabaseNotExistException, TableAlreadyExistException;
- void updateTable(String catalog, String database, CatalogTableDTO table) throws TableNotExistException;
+ void updateTable(CatalogType type, String catalog, String database, CatalogTableDTO table) throws TableNotExistException;
- void renameTable(String catalog, String database, String currentName, String newName) throws TableAlreadyExistException, TableNotExistException;
+ void renameTable(CatalogType type, String catalog, String database, String currentName, String newName) throws TableAlreadyExistException, TableNotExistException;
- void deleteTable(String catalog, String database, String table) throws TableNotExistException;
+ void deleteTable(CatalogType type, String catalog, String database, String table) throws TableNotExistException;
- List listViews(String catalog, String database);
+ List listViews(CatalogType type, String catalog, String database);
- Optional getView(String catalog, String database, String view);
+ Optional getView(CatalogType type, String catalog, String database, String view);
- boolean viewExists(String catalog, String database, String view);
+ boolean viewExists(CatalogType type, String catalog, String database, String view);
- void insertView(String catalog, String database, CatalogTableDTO view) throws DatabaseNotExistException, TableAlreadyExistException;
+ void insertView(CatalogType type, String catalog, String database, CatalogTableDTO view) throws DatabaseNotExistException, TableAlreadyExistException;
- void updateView(String catalog, String database, CatalogTableDTO view) throws TableNotExistException;
+ void updateView(CatalogType type, String catalog, String database, CatalogTableDTO view) throws TableNotExistException;
- void renameView(String catalog, String database, String currentName, String newName) throws TableNotExistException, TableAlreadyExistException;
+ void renameView(CatalogType type, String catalog, String database, String currentName, String newName) throws TableNotExistException, TableAlreadyExistException;
- void deleteView(String catalog, String database, String viewName) throws TableNotExistException;
+ void deleteView(CatalogType type, String catalog, String database, String viewName) throws TableNotExistException;
- List listFunctions(String catalog, String database);
+ List listFunctions(CatalogType type, String catalog, String database);
- Optional getFunction(String catalog, String database, String function);
+ Optional getFunction(CatalogType type, String catalog, String database, String function);
- boolean functionExists(String catalog, String database, String function);
+ boolean functionExists(CatalogType type, String catalog, String database, String function);
- void insertFunction(String catalog, String database, CatalogFunctionDTO function) throws DatabaseNotExistException, FunctionAlreadyExistException;
+ void insertFunction(CatalogType type, String catalog, String database, CatalogFunctionDTO function) throws DatabaseNotExistException, FunctionAlreadyExistException;
- void updateFunction(String catalog, String database, CatalogFunctionDTO function) throws FunctionNotExistException;
+ void updateFunction(CatalogType type, String catalog, String database, CatalogFunctionDTO function) throws FunctionNotExistException;
- void deleteFunction(String catalog, String database, String functionName) throws FunctionNotExistException;
+ void deleteFunction(CatalogType type, String catalog, String database, String functionName) throws FunctionNotExistException;
}
diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/CatalogStoreService.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/CatalogStoreService.java
index a3c76f3..8c6680f 100644
--- a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/CatalogStoreService.java
+++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/CatalogStoreService.java
@@ -19,17 +19,18 @@
package cn.sliew.sakura.catalog.service;
import cn.sliew.sakura.catalog.service.dto.CatalogStoreDTO;
+import cn.sliew.sakura.common.dict.catalog.CatalogType;
import java.util.List;
import java.util.Optional;
public interface CatalogStoreService {
- List list();
+ List list(CatalogType type);
- Optional get(String catalogName);
+ Optional get(CatalogType type, String catalogName);
void insert(CatalogStoreDTO dto);
- void delete(String catalogName);
+ void delete(CatalogType type, String catalogName);
}
diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/convert/CatalogDatabaseConvert.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/convert/CatalogDatabaseConvert.java
index 2133c21..a27bca9 100644
--- a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/convert/CatalogDatabaseConvert.java
+++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/convert/CatalogDatabaseConvert.java
@@ -18,13 +18,13 @@
package cn.sliew.sakura.catalog.service.convert;
+import cn.sliew.milky.common.util.JacksonUtil;
import cn.sliew.sakura.catalog.service.dto.CatalogDatabaseDTO;
import cn.sliew.sakura.common.exception.Rethrower;
import cn.sliew.sakura.common.util.CodecUtil;
-import cn.sliew.sakura.common.util.JacksonUtil;
import cn.sliew.sakura.dao.entity.CatalogDatabase;
+import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import java.util.Map;
diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/convert/CatalogStoreConvert.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/convert/CatalogStoreConvert.java
index ef0e110..f25f62b 100644
--- a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/convert/CatalogStoreConvert.java
+++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/convert/CatalogStoreConvert.java
@@ -18,14 +18,14 @@
package cn.sliew.sakura.catalog.service.convert;
+import cn.sliew.milky.common.util.JacksonUtil;
import cn.sliew.sakura.catalog.service.dto.CatalogStoreDTO;
import cn.sliew.sakura.common.exception.Rethrower;
import cn.sliew.sakura.common.util.CodecUtil;
-import cn.sliew.sakura.common.util.JacksonUtil;
import cn.sliew.sakura.dao.entity.CatalogStore;
+import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import java.util.Map;
diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/convert/CatalogTableConvert.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/convert/CatalogTableConvert.java
index adb6ae1..33f3bbe 100644
--- a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/convert/CatalogTableConvert.java
+++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/convert/CatalogTableConvert.java
@@ -18,14 +18,14 @@
package cn.sliew.sakura.catalog.service.convert;
+import cn.sliew.milky.common.util.JacksonUtil;
import cn.sliew.sakura.catalog.service.dto.CatalogTableDTO;
import cn.sliew.sakura.catalog.service.dto.SchemaDTO;
import cn.sliew.sakura.common.exception.Rethrower;
import cn.sliew.sakura.common.util.CodecUtil;
-import cn.sliew.sakura.common.util.JacksonUtil;
import cn.sliew.sakura.dao.entity.CatalogTable;
+import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import java.util.Map;
diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/convert/Util.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/convert/Util.java
index 32f8188..2a5cc74 100644
--- a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/convert/Util.java
+++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/convert/Util.java
@@ -28,12 +28,14 @@ static void copyProperties(BaseDTO source, BaseDO dest) {
dest.setId(source.getId());
dest.setCreateTime(source.getCreateTime());
dest.setUpdateTime(source.getUpdateTime());
+ dest.setDeleteTime(source.getDeleteTime());
}
static void copyProperties(BaseDO source, BaseDTO dest) {
dest.setId(source.getId());
dest.setCreateTime(source.getCreateTime());
dest.setUpdateTime(source.getUpdateTime());
+ dest.setDeleteTime(source.getDeleteTime());
}
}
diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/BaseDTO.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/BaseDTO.java
index fadb8cb..a9c1911 100644
--- a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/BaseDTO.java
+++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/BaseDTO.java
@@ -34,4 +34,6 @@ public class BaseDTO implements Serializable {
private Date updateTime;
+ private Date deleteTime;
+
}
diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/CatalogDatabaseDTO.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/CatalogDatabaseDTO.java
index 128e0bc..5ae69bb 100644
--- a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/CatalogDatabaseDTO.java
+++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/CatalogDatabaseDTO.java
@@ -18,6 +18,7 @@
package cn.sliew.sakura.catalog.service.dto;
+import cn.sliew.sakura.common.dict.catalog.CatalogType;
import lombok.Data;
import java.util.Map;
@@ -28,6 +29,7 @@
@Data
public class CatalogDatabaseDTO extends BaseDTO {
+ private CatalogType type;
private String catalog;
private String name;
private Map properties;
diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/CatalogFunctionDTO.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/CatalogFunctionDTO.java
index 0cfbd80..c1b68c8 100644
--- a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/CatalogFunctionDTO.java
+++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/CatalogFunctionDTO.java
@@ -18,7 +18,7 @@
package cn.sliew.sakura.catalog.service.dto;
-import cn.sliew.sakura.common.dict.CatalogFunctionLanguage;
+import cn.sliew.sakura.common.dict.catalog.flink.CatalogFunctionLanguage;
import lombok.Data;
/**
diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/CatalogStoreDTO.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/CatalogStoreDTO.java
index 9534bd5..907870b 100644
--- a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/CatalogStoreDTO.java
+++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/CatalogStoreDTO.java
@@ -18,12 +18,14 @@
package cn.sliew.sakura.catalog.service.dto;
+import cn.sliew.sakura.common.dict.catalog.CatalogType;
import lombok.Data;
import org.apache.flink.configuration.Configuration;
@Data
public class CatalogStoreDTO extends BaseDTO {
+ private CatalogType type;
private String catalogName;
private Configuration configuration;
}
diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/CatalogTableDTO.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/CatalogTableDTO.java
index e8acfe7..06a91a9 100644
--- a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/CatalogTableDTO.java
+++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/CatalogTableDTO.java
@@ -18,7 +18,7 @@
package cn.sliew.sakura.catalog.service.dto;
-import cn.sliew.sakura.common.dict.CatalogTableKind;
+import cn.sliew.sakura.common.dict.catalog.flink.CatalogTableKind;
import lombok.Data;
import java.util.Map;
diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/ColumnDTO.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/ColumnDTO.java
index 6c4aa87..3e890df 100644
--- a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/ColumnDTO.java
+++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/ColumnDTO.java
@@ -18,7 +18,7 @@
package cn.sliew.sakura.catalog.service.dto;
-import cn.sliew.sakura.common.dict.CatalogColumnType;
+import cn.sliew.sakura.common.dict.catalog.flink.CatalogColumnType;
import lombok.Data;
/**
diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/UniqueConstraintDTO.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/UniqueConstraintDTO.java
index 2206edb..1a5bc8c 100644
--- a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/UniqueConstraintDTO.java
+++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/UniqueConstraintDTO.java
@@ -18,7 +18,7 @@
package cn.sliew.sakura.catalog.service.dto;
-import cn.sliew.sakura.common.dict.CatalogConstraintType;
+import cn.sliew.sakura.common.dict.catalog.flink.CatalogConstraintType;
import lombok.Data;
import java.util.List;
diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/impl/CatalogServiceImpl.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/impl/CatalogServiceImpl.java
index 984d7a5..7224bbc 100644
--- a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/impl/CatalogServiceImpl.java
+++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/impl/CatalogServiceImpl.java
@@ -25,7 +25,8 @@
import cn.sliew.sakura.catalog.service.dto.CatalogDatabaseDTO;
import cn.sliew.sakura.catalog.service.dto.CatalogFunctionDTO;
import cn.sliew.sakura.catalog.service.dto.CatalogTableDTO;
-import cn.sliew.sakura.common.dict.CatalogTableKind;
+import cn.sliew.sakura.common.dict.catalog.CatalogType;
+import cn.sliew.sakura.common.dict.catalog.flink.CatalogTableKind;
import cn.sliew.sakura.dao.entity.CatalogDatabase;
import cn.sliew.sakura.dao.entity.CatalogFunction;
import cn.sliew.sakura.dao.entity.CatalogTable;
@@ -39,7 +40,7 @@
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
-import java.sql.SQLException;
+import java.util.Date;
import java.util.List;
import java.util.Optional;
@@ -52,11 +53,13 @@ public CatalogServiceImpl(SqlSessionFactory sqlSessionFactory) {
}
@Override
- public List listDatabases(String catalog) {
+ public List listDatabases(CatalogType type, String catalog) {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogDatabaseMapper catalogDatabaseMapper = sqlSession.getMapper(CatalogDatabaseMapper.class);
LambdaQueryWrapper queryWrapper = Wrappers.lambdaQuery(CatalogDatabase.class)
+ .eq(CatalogDatabase::getType, type)
.eq(CatalogDatabase::getCatalog, catalog)
+ .isNull(CatalogDatabase::getDeleteTime)
.orderByAsc(CatalogDatabase::getName);
List databases = catalogDatabaseMapper.selectList(queryWrapper);
return CatalogDatabaseConvert.INSTANCE.toDto(databases);
@@ -64,7 +67,7 @@ public List listDatabases(String catalog) {
}
@Override
- public Optional getDatabase(String catalog, String database) {
+ public Optional getDatabase(CatalogType type, String catalog, String database) {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogDatabaseMapper catalogDatabaseMapper = sqlSession.getMapper(CatalogDatabaseMapper.class);
LambdaQueryWrapper queryWrapper = Wrappers.lambdaQuery(CatalogDatabase.class)
@@ -79,8 +82,8 @@ public Optional getDatabase(String catalog, String database)
}
@Override
- public boolean databaseExists(String catalog, String database) {
- Optional optional = getDatabase(catalog, database);
+ public boolean databaseExists(CatalogType type, String catalog, String database) {
+ Optional optional = getDatabase(type, catalog, database);
return optional.isPresent();
}
@@ -88,7 +91,7 @@ public boolean databaseExists(String catalog, String database) {
public void insertDatabase(CatalogDatabaseDTO database) throws DatabaseAlreadyExistException {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogDatabaseMapper catalogDatabaseMapper = sqlSession.getMapper(CatalogDatabaseMapper.class);
- if (databaseExists(database.getCatalog(), database.getName())) {
+ if (databaseExists(database.getType(), database.getCatalog(), database.getName())) {
throw new DatabaseAlreadyExistException(database.getCatalog(), database.getName());
}
CatalogDatabase record = CatalogDatabaseConvert.INSTANCE.toDo(database);
@@ -101,7 +104,7 @@ public void insertDatabase(CatalogDatabaseDTO database) throws DatabaseAlreadyEx
public void updateDatabase(CatalogDatabaseDTO database) throws DatabaseNotExistException {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogDatabaseMapper catalogDatabaseMapper = sqlSession.getMapper(CatalogDatabaseMapper.class);
- if (databaseExists(database.getCatalog(), database.getName()) == false) {
+ if (databaseExists(database.getType(), database.getCatalog(), database.getName()) == false) {
throw new DatabaseNotExistException(database.getCatalog(), database.getName());
}
CatalogDatabase record = CatalogDatabaseConvert.INSTANCE.toDo(database);
@@ -111,61 +114,69 @@ public void updateDatabase(CatalogDatabaseDTO database) throws DatabaseNotExistE
}
@Override
- public void deleteDatabase(String catalog, String database) throws DatabaseNotExistException {
+ public void deleteDatabase(CatalogType type, String catalog, String database) throws DatabaseNotExistException {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogDatabaseMapper catalogDatabaseMapper = sqlSession.getMapper(CatalogDatabaseMapper.class);
- if (databaseExists(catalog, database) == false) {
+ if (databaseExists(type, catalog, database) == false) {
throw new DatabaseNotExistException(catalog, database);
}
LambdaQueryWrapper queryWrapper = Wrappers.lambdaQuery(CatalogDatabase.class)
+ .eq(CatalogDatabase::getType, type)
.eq(CatalogDatabase::getCatalog, catalog)
- .eq(CatalogDatabase::getName, database);
- catalogDatabaseMapper.delete(queryWrapper);
+ .eq(CatalogDatabase::getName, database)
+ .isNull(CatalogDatabase::getDeleteTime);
+
+ CatalogDatabase record = new CatalogDatabase();
+ record.setType(type);
+ record.setCatalog(catalog);
+ record.setName(database);
+ record.setDeleteTime(new Date());
+ catalogDatabaseMapper.update(record, queryWrapper);
sqlSession.commit();
}
}
@Override
- public boolean isDatabaseEmpty(String catalog, String database) {
+ public boolean isDatabaseEmpty(CatalogType type, String catalog, String database) {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogTableMapper catalogTableMapper = sqlSession.getMapper(CatalogTableMapper.class);
CatalogFunctionMapper catalogFunctionMapper = sqlSession.getMapper(CatalogFunctionMapper.class);
- int tableCount = catalogTableMapper.countByDatabase(catalog, database, CatalogTableKind.TABLE);
- int functionCount = catalogFunctionMapper.countByDatabase(catalog, database);
+ int tableCount = catalogTableMapper.countByDatabase(type, catalog, database, CatalogTableKind.TABLE);
+ int functionCount = catalogFunctionMapper.countByDatabase(type, catalog, database);
return tableCount != 0 || functionCount != 0;
}
}
@Override
- public List listTables(String catalog, String database) {
+ public List listTables(CatalogType type, String catalog, String database) {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogTableMapper catalogTableMapper = sqlSession.getMapper(CatalogTableMapper.class);
- List records = catalogTableMapper.selectByDatabase(catalog, database, CatalogTableKind.TABLE);
+ List records = catalogTableMapper.selectByDatabase(type, catalog, database, CatalogTableKind.TABLE);
return CatalogTableConvert.INSTANCE.toDto(records);
}
}
@Override
- public Optional getTable(String catalog, String database, String table) {
+ public Optional getTable(CatalogType type, String catalog, String database, String table) {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogTableMapper catalogTableMapper = sqlSession.getMapper(CatalogTableMapper.class);
- Optional optional = catalogTableMapper.selectByName(catalog, database, CatalogTableKind.TABLE, table);
+ Optional optional = catalogTableMapper.selectByName(type, catalog, database, CatalogTableKind.TABLE, table);
return optional.map(record -> CatalogTableConvert.INSTANCE.toDto(record));
}
}
@Override
- public boolean tableExists(String catalog, String database, String table) {
- Optional optional = getTable(catalog, database, table);
+ public boolean tableExists(CatalogType type, String catalog, String database, String table) {
+ Optional optional = getTable(type, catalog, database, table);
return optional.isPresent();
}
@Override
- public void insertTable(String catalog, String database, CatalogTableDTO table) throws DatabaseNotExistException, TableAlreadyExistException {
+ public void insertTable(CatalogType type, String catalog, String database, CatalogTableDTO table) throws DatabaseNotExistException, TableAlreadyExistException {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogTableMapper catalogTableMapper = sqlSession.getMapper(CatalogTableMapper.class);
- CatalogDatabaseDTO catalogDatabaseDTO = getDatabase(catalog, database).orElseThrow(() -> new DatabaseNotExistException(catalog, database));
- if (tableExists(catalog, database, table.getName())) {
+ CatalogDatabaseDTO catalogDatabaseDTO = getDatabase(type, catalog, database).orElseThrow(() -> new DatabaseNotExistException(catalog, database));
+ if (tableExists(type, catalog, database, table.getName())) {
throw new TableAlreadyExistException(catalog, new ObjectPath(database, table.getName()));
}
CatalogTable record = CatalogTableConvert.INSTANCE.toDo(table);
@@ -176,10 +187,10 @@ public void insertTable(String catalog, String database, CatalogTableDTO table)
}
@Override
- public void updateTable(String catalog, String database, CatalogTableDTO table) throws TableNotExistException {
+ public void updateTable(CatalogType type, String catalog, String database, CatalogTableDTO table) throws TableNotExistException {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogTableMapper catalogTableMapper = sqlSession.getMapper(CatalogTableMapper.class);
- if (tableExists(catalog, database, table.getName()) == false) {
+ if (tableExists(type, catalog, database, table.getName()) == false) {
throw new TableNotExistException(catalog, new ObjectPath(database, table.getName()));
}
CatalogTable record = CatalogTableConvert.INSTANCE.toDo(table);
@@ -189,11 +200,11 @@ public void updateTable(String catalog, String database, CatalogTableDTO table)
}
@Override
- public void renameTable(String catalog, String database, String currentName, String newName) throws TableAlreadyExistException, TableNotExistException {
+ public void renameTable(CatalogType type, String catalog, String database, String currentName, String newName) throws TableAlreadyExistException, TableNotExistException {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogTableMapper catalogTableMapper = sqlSession.getMapper(CatalogTableMapper.class);
- CatalogTableDTO catalogTableDTO = getTable(catalog, database, currentName).orElseThrow(() -> new TableNotExistException(catalog, new ObjectPath(database, currentName)));
- if (tableExists(catalog, database, newName)) {
+ CatalogTableDTO catalogTableDTO = getTable(type, catalog, database, currentName).orElseThrow(() -> new TableNotExistException(catalog, new ObjectPath(database, currentName)));
+ if (tableExists(type, catalog, database, newName)) {
throw new TableAlreadyExistException(catalog, new ObjectPath(database, newName));
}
CatalogTable record = CatalogTableConvert.INSTANCE.toDo(catalogTableDTO);
@@ -204,46 +215,47 @@ public void renameTable(String catalog, String database, String currentName, Str
}
@Override
- public void deleteTable(String catalog, String database, String table) throws TableNotExistException {
+ public void deleteTable(CatalogType type, String catalog, String database, String table) throws TableNotExistException {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogTableMapper catalogTableMapper = sqlSession.getMapper(CatalogTableMapper.class);
- CatalogTableDTO catalogTableDTO = getTable(catalog, database, table).orElseThrow(() -> new TableNotExistException(catalog, new ObjectPath(database, table)));
- CatalogTable record = CatalogTableConvert.INSTANCE.toDo(catalogTableDTO);
- catalogTableMapper.deleteById(record.getId());
+ if (!tableExists(type, catalog, database, table)) {
+ throw new TableNotExistException(catalog, new ObjectPath(database, table));
+ }
+ catalogTableMapper.deleteByName(type, catalog, database, CatalogTableKind.TABLE, table);
sqlSession.commit();
}
}
@Override
- public List listViews(String catalog, String database) {
+ public List listViews(CatalogType type, String catalog, String database) {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogTableMapper catalogTableMapper = sqlSession.getMapper(CatalogTableMapper.class);
- List views = catalogTableMapper.selectByDatabase(catalog, database, CatalogTableKind.VIEW);
+ List views = catalogTableMapper.selectByDatabase(type, catalog, database, CatalogTableKind.VIEW);
return CatalogTableConvert.INSTANCE.toDto(views);
}
}
@Override
- public Optional getView(String catalog, String database, String view) {
+ public Optional getView(CatalogType type, String catalog, String database, String view) {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogTableMapper catalogTableMapper = sqlSession.getMapper(CatalogTableMapper.class);
- Optional optional = catalogTableMapper.selectByName(catalog, database, CatalogTableKind.VIEW, view);
+ Optional optional = catalogTableMapper.selectByName(type, catalog, database, CatalogTableKind.VIEW, view);
return optional.map(record -> CatalogTableConvert.INSTANCE.toDto(record));
}
}
@Override
- public boolean viewExists(String catalog, String database, String view) {
- Optional optional = getView(catalog, database, view);
+ public boolean viewExists(CatalogType type, String catalog, String database, String view) {
+ Optional optional = getView(type, catalog, database, view);
return optional.isPresent();
}
@Override
- public void insertView(String catalog, String database, CatalogTableDTO view) throws DatabaseNotExistException, TableAlreadyExistException {
+ public void insertView(CatalogType type, String catalog, String database, CatalogTableDTO view) throws DatabaseNotExistException, TableAlreadyExistException {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogTableMapper catalogTableMapper = sqlSession.getMapper(CatalogTableMapper.class);
- CatalogDatabaseDTO catalogDatabaseDTO = getDatabase(catalog, database).orElseThrow(() -> new DatabaseNotExistException(catalog, database));
- if (viewExists(catalog, database, view.getName())) {
+ CatalogDatabaseDTO catalogDatabaseDTO = getDatabase(type, catalog, database).orElseThrow(() -> new DatabaseNotExistException(catalog, database));
+ if (viewExists(type, catalog, database, view.getName())) {
throw new TableAlreadyExistException(catalog, new ObjectPath(database, view.getName()));
}
CatalogTable record = CatalogTableConvert.INSTANCE.toDo(view);
@@ -254,10 +266,10 @@ public void insertView(String catalog, String database, CatalogTableDTO view) th
}
@Override
- public void updateView(String catalog, String database, CatalogTableDTO view) throws TableNotExistException {
+ public void updateView(CatalogType type, String catalog, String database, CatalogTableDTO view) throws TableNotExistException {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogTableMapper catalogTableMapper = sqlSession.getMapper(CatalogTableMapper.class);
- if (viewExists(catalog, database, view.getName()) == false) {
+ if (viewExists(type, catalog, database, view.getName()) == false) {
throw new TableNotExistException(catalog, new ObjectPath(database, view.getName()));
}
CatalogTable record = CatalogTableConvert.INSTANCE.toDo(view);
@@ -267,11 +279,11 @@ public void updateView(String catalog, String database, CatalogTableDTO view) th
}
@Override
- public void renameView(String catalog, String database, String currentName, String newName) throws TableNotExistException, TableAlreadyExistException {
+ public void renameView(CatalogType type, String catalog, String database, String currentName, String newName) throws TableNotExistException, TableAlreadyExistException {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogTableMapper catalogTableMapper = sqlSession.getMapper(CatalogTableMapper.class);
- CatalogTableDTO catalogViewDTO = getView(catalog, database, currentName).orElseThrow(() -> new TableNotExistException(catalog, new ObjectPath(database, currentName)));
- if (viewExists(catalog, database, newName)) {
+ CatalogTableDTO catalogViewDTO = getView(type, catalog, database, currentName).orElseThrow(() -> new TableNotExistException(catalog, new ObjectPath(database, currentName)));
+ if (viewExists(type, catalog, database, newName)) {
throw new TableAlreadyExistException(catalog, new ObjectPath(database, newName));
}
CatalogTable record = CatalogTableConvert.INSTANCE.toDo(catalogViewDTO);
@@ -282,46 +294,47 @@ public void renameView(String catalog, String database, String currentName, Stri
}
@Override
- public void deleteView(String catalog, String database, String viewName) throws TableNotExistException {
+ public void deleteView(CatalogType type, String catalog, String database, String viewName) throws TableNotExistException {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogTableMapper catalogTableMapper = sqlSession.getMapper(CatalogTableMapper.class);
- CatalogTableDTO catalogViewDTO = getView(catalog, database, viewName).orElseThrow(() -> new TableNotExistException(catalog, new ObjectPath(database, viewName)));
- CatalogTable record = CatalogTableConvert.INSTANCE.toDo(catalogViewDTO);
- catalogTableMapper.deleteById(record.getId());
+ if (!viewExists(type, catalog, database, viewName)) {
+ throw new TableNotExistException(catalog, new ObjectPath(database, viewName));
+ }
+ catalogTableMapper.deleteByName(type, catalog, database, CatalogTableKind.VIEW, viewName);
sqlSession.commit();
}
}
@Override
- public List listFunctions(String catalog, String database) {
+ public List listFunctions(CatalogType type, String catalog, String database) {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogFunctionMapper catalogFunctionMapper = sqlSession.getMapper(CatalogFunctionMapper.class);
- List catalogFunctions = catalogFunctionMapper.selectByDatabase(catalog, database);
+ List catalogFunctions = catalogFunctionMapper.selectByDatabase(type, catalog, database);
return CatalogFunctionConvert.INSTANCE.toDto(catalogFunctions);
}
}
@Override
- public Optional getFunction(String catalog, String database, String function) {
+ public Optional getFunction(CatalogType type, String catalog, String database, String function) {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogFunctionMapper catalogFunctionMapper = sqlSession.getMapper(CatalogFunctionMapper.class);
- Optional optional = catalogFunctionMapper.selectByName(catalog, database, function);
+ Optional optional = catalogFunctionMapper.selectByName(type, catalog, database, function);
return optional.map(record -> CatalogFunctionConvert.INSTANCE.toDto(record));
}
}
@Override
- public boolean functionExists(String catalog, String database, String function) {
- Optional optional = getFunction(catalog, database, function);
+ public boolean functionExists(CatalogType type, String catalog, String database, String function) {
+ Optional optional = getFunction(type, catalog, database, function);
return optional.isPresent();
}
@Override
- public void insertFunction(String catalog, String database, CatalogFunctionDTO function) throws DatabaseNotExistException, FunctionAlreadyExistException {
+ public void insertFunction(CatalogType type, String catalog, String database, CatalogFunctionDTO function) throws DatabaseNotExistException, FunctionAlreadyExistException {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogFunctionMapper catalogFunctionMapper = sqlSession.getMapper(CatalogFunctionMapper.class);
- CatalogDatabaseDTO catalogDatabaseDTO = getDatabase(catalog, database).orElseThrow(() -> new DatabaseNotExistException(catalog, database));
- if (functionExists(catalog, database, function.getName())) {
+ CatalogDatabaseDTO catalogDatabaseDTO = getDatabase(type, catalog, database).orElseThrow(() -> new DatabaseNotExistException(catalog, database));
+ if (functionExists(type, catalog, database, function.getName())) {
throw new FunctionAlreadyExistException(catalog, new ObjectPath(database, function.getName()));
}
CatalogFunction record = CatalogFunctionConvert.INSTANCE.toDo(function);
@@ -332,10 +345,10 @@ public void insertFunction(String catalog, String database, CatalogFunctionDTO f
}
@Override
- public void updateFunction(String catalog, String database, CatalogFunctionDTO function) throws FunctionNotExistException {
+ public void updateFunction(CatalogType type, String catalog, String database, CatalogFunctionDTO function) throws FunctionNotExistException {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogFunctionMapper catalogFunctionMapper = sqlSession.getMapper(CatalogFunctionMapper.class);
- if (functionExists(catalog, database, function.getName()) == false) {
+ if (functionExists(type, catalog, database, function.getName()) == false) {
throw new FunctionNotExistException(catalog, new ObjectPath(database, function.getName()));
}
CatalogFunction record = CatalogFunctionConvert.INSTANCE.toDo(function);
@@ -345,12 +358,13 @@ public void updateFunction(String catalog, String database, CatalogFunctionDTO f
}
@Override
- public void deleteFunction(String catalog, String database, String functionName) throws FunctionNotExistException {
+ public void deleteFunction(CatalogType type, String catalog, String database, String functionName) throws FunctionNotExistException {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogFunctionMapper catalogFunctionMapper = sqlSession.getMapper(CatalogFunctionMapper.class);
- CatalogFunctionDTO catalogFunctionDTO = getFunction(catalog, database, functionName).orElseThrow(() -> new FunctionNotExistException(catalog, new ObjectPath(database, functionName)));
- CatalogFunction record = CatalogFunctionConvert.INSTANCE.toDo(catalogFunctionDTO);
- catalogFunctionMapper.deleteById(record.getId());
+ if (!functionExists(type, catalog, database, functionName)) {
+ throw new FunctionNotExistException(catalog, new ObjectPath(database, functionName));
+ }
+ catalogFunctionMapper.deleteByName(type, catalog, database, functionName);
sqlSession.commit();
}
}
diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/impl/CatalogStoreServiceImpl.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/impl/CatalogStoreServiceImpl.java
index c0173f6..c48c526 100644
--- a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/impl/CatalogStoreServiceImpl.java
+++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/impl/CatalogStoreServiceImpl.java
@@ -21,6 +21,7 @@
import cn.sliew.sakura.catalog.service.CatalogStoreService;
import cn.sliew.sakura.catalog.service.convert.CatalogStoreConvert;
import cn.sliew.sakura.catalog.service.dto.CatalogStoreDTO;
+import cn.sliew.sakura.common.dict.catalog.CatalogType;
import cn.sliew.sakura.dao.entity.CatalogStore;
import cn.sliew.sakura.dao.mapper.CatalogStoreMapper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@@ -29,6 +30,7 @@
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
+import java.util.Date;
import java.util.List;
import java.util.Optional;
@@ -41,10 +43,12 @@ public CatalogStoreServiceImpl(SqlSessionFactory sqlSessionFactory) {
}
@Override
- public List list() {
+ public List list(CatalogType type) {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogStoreMapper catalogStoreMapper = sqlSession.getMapper(CatalogStoreMapper.class);
LambdaQueryWrapper queryWrapper = Wrappers.lambdaQuery(CatalogStore.class)
+ .eq(CatalogStore::getType, type)
+ .isNull(CatalogStore::getDeleteTime)
.orderByAsc(CatalogStore::getCatalogName);
List catalogs = catalogStoreMapper.selectList(queryWrapper);
return CatalogStoreConvert.INSTANCE.toDto(catalogs);
@@ -52,11 +56,13 @@ public List list() {
}
@Override
- public Optional get(String catalogName) {
+ public Optional get(CatalogType type, String catalogName) {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogStoreMapper catalogStoreMapper = sqlSession.getMapper(CatalogStoreMapper.class);
LambdaQueryWrapper queryWrapper = Wrappers.lambdaQuery(CatalogStore.class)
- .eq(CatalogStore::getCatalogName, catalogName);
+ .eq(CatalogStore::getType, type)
+ .eq(CatalogStore::getCatalogName, catalogName)
+ .isNull(CatalogStore::getDeleteTime);
CatalogStore catalog = catalogStoreMapper.selectOne(queryWrapper);
return Optional.ofNullable(catalog).map(CatalogStoreConvert.INSTANCE::toDto);
}
@@ -73,12 +79,18 @@ public void insert(CatalogStoreDTO dto) {
}
@Override
- public void delete(String catalogName) {
+ public void delete(CatalogType type, String catalogName) {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogStoreMapper catalogStoreMapper = sqlSession.getMapper(CatalogStoreMapper.class);
LambdaUpdateWrapper updateWrapper = Wrappers.lambdaUpdate(CatalogStore.class)
- .eq(CatalogStore::getCatalogName, catalogName);
- catalogStoreMapper.delete(updateWrapper);
+ .eq(CatalogStore::getType, type)
+ .eq(CatalogStore::getCatalogName, catalogName)
+ .isNull(CatalogStore::getDeleteTime);
+ CatalogStore record = new CatalogStore();
+ record.setType(type);
+ record.setCatalogName(catalogName);
+ record.setDeleteTime(new Date());
+ catalogStoreMapper.update(record, updateWrapper);
sqlSession.commit();
}
}
diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/store/JdbcCatalogStore.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/store/JdbcCatalogStore.java
index bb594e4..6236359 100644
--- a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/store/JdbcCatalogStore.java
+++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/store/JdbcCatalogStore.java
@@ -21,6 +21,7 @@
import cn.sliew.sakura.catalog.service.CatalogStoreService;
import cn.sliew.sakura.catalog.service.dto.CatalogStoreDTO;
import cn.sliew.sakura.catalog.service.impl.CatalogStoreServiceImpl;
+import cn.sliew.sakura.common.dict.catalog.CatalogType;
import org.apache.flink.table.catalog.AbstractCatalogStore;
import org.apache.flink.table.catalog.CatalogDescriptor;
import cn.sliew.sakura.dao.util.MybatisUtil;
@@ -70,6 +71,7 @@ public void storeCatalog(String catalogName, CatalogDescriptor catalog) throws C
throw new CatalogException(String.format("Catalog %s's store is already exist.", catalogName));
}
CatalogStoreDTO dto = new CatalogStoreDTO();
+ dto.setType(CatalogType.FLINK);
dto.setCatalogName(catalog.getCatalogName());
dto.setConfiguration(catalog.getConfiguration());
catalogStoreService.insert(dto);
@@ -80,17 +82,18 @@ public void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws
if (contains(catalogName) == false && ignoreIfNotExists == false) {
throw new CatalogException(String.format("Catalog %s's store is not exist", catalogName));
}
- catalogStoreService.delete(catalogName);
+
+ catalogStoreService.delete(CatalogType.FLINK, catalogName);
}
@Override
public Optional getCatalog(String catalogName) throws CatalogException {
- return catalogStoreService.get(catalogName).map(dto -> CatalogDescriptor.of(dto.getCatalogName(), dto.getConfiguration()));
+ return catalogStoreService.get(CatalogType.FLINK, catalogName).map(dto -> CatalogDescriptor.of(dto.getCatalogName(), dto.getConfiguration()));
}
@Override
public Set listCatalogs() throws CatalogException {
- return catalogStoreService.list().stream().map(CatalogStoreDTO::getCatalogName).collect(Collectors.toSet());
+ return catalogStoreService.list(CatalogType.FLINK).stream().map(CatalogStoreDTO::getCatalogName).collect(Collectors.toSet());
}
@Override
diff --git a/sakura-common/pom.xml b/sakura-common/pom.xml
index 4b323dc..19f91dd 100644
--- a/sakura-common/pom.xml
+++ b/sakura-common/pom.xml
@@ -29,27 +29,19 @@
sakura-common
-
- com.baomidou
- mybatis-plus-annotation
-
-
cn.sliew
milky-common
- org.apache.flink
- flink-table-common
-
-
- org.apache.flink
- flink-shaded-jackson
+ com.google.guava
+ guava
+
- org.apache.flink
- flink-shaded-guava
+ com.baomidou
+ mybatis-plus-annotation
diff --git a/sakura-common/src/main/java/cn/sliew/sakura/common/dict/catalog/CatalogType.java b/sakura-common/src/main/java/cn/sliew/sakura/common/dict/catalog/CatalogType.java
new file mode 100644
index 0000000..c9e0d48
--- /dev/null
+++ b/sakura-common/src/main/java/cn/sliew/sakura/common/dict/catalog/CatalogType.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.sliew.sakura.common.dict.catalog;
+
+import com.baomidou.mybatisplus.annotation.EnumValue;
+import com.fasterxml.jackson.annotation.JsonValue;
+import lombok.Getter;
+
+@Getter
+public enum CatalogType {
+
+ FLINK("Flink", "Flink"),
+ SeaTunnel("SeaTunnel", "SeaTunnel"),
+ GENERIC("generic", "generic"),
+ ;
+
+ @JsonValue
+ @EnumValue
+ private String value;
+ private String label;
+
+ CatalogType(String value, String label) {
+ this.value = value;
+ this.label = label;
+ }
+}
diff --git a/sakura-common/src/main/java/cn/sliew/sakura/common/dict/CatalogColumnType.java b/sakura-common/src/main/java/cn/sliew/sakura/common/dict/catalog/flink/CatalogColumnType.java
similarity index 91%
rename from sakura-common/src/main/java/cn/sliew/sakura/common/dict/CatalogColumnType.java
rename to sakura-common/src/main/java/cn/sliew/sakura/common/dict/catalog/flink/CatalogColumnType.java
index 0567346..f3eaaab 100644
--- a/sakura-common/src/main/java/cn/sliew/sakura/common/dict/CatalogColumnType.java
+++ b/sakura-common/src/main/java/cn/sliew/sakura/common/dict/catalog/flink/CatalogColumnType.java
@@ -16,11 +16,11 @@
* limitations under the License.
*/
-package cn.sliew.sakura.common.dict;
+package cn.sliew.sakura.common.dict.catalog.flink;
import com.baomidou.mybatisplus.annotation.EnumValue;
+import com.fasterxml.jackson.annotation.JsonValue;
import lombok.Getter;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
@Getter
public enum CatalogColumnType {
diff --git a/sakura-common/src/main/java/cn/sliew/sakura/common/dict/CatalogConstraintType.java b/sakura-common/src/main/java/cn/sliew/sakura/common/dict/catalog/flink/CatalogConstraintType.java
similarity index 91%
rename from sakura-common/src/main/java/cn/sliew/sakura/common/dict/CatalogConstraintType.java
rename to sakura-common/src/main/java/cn/sliew/sakura/common/dict/catalog/flink/CatalogConstraintType.java
index 24793b2..84e0bb7 100644
--- a/sakura-common/src/main/java/cn/sliew/sakura/common/dict/CatalogConstraintType.java
+++ b/sakura-common/src/main/java/cn/sliew/sakura/common/dict/catalog/flink/CatalogConstraintType.java
@@ -16,11 +16,11 @@
* limitations under the License.
*/
-package cn.sliew.sakura.common.dict;
+package cn.sliew.sakura.common.dict.catalog.flink;
import com.baomidou.mybatisplus.annotation.EnumValue;
+import com.fasterxml.jackson.annotation.JsonValue;
import lombok.Getter;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
@Getter
public enum CatalogConstraintType {
diff --git a/sakura-common/src/main/java/cn/sliew/sakura/common/dict/CatalogFunctionLanguage.java b/sakura-common/src/main/java/cn/sliew/sakura/common/dict/catalog/flink/CatalogFunctionLanguage.java
similarity index 91%
rename from sakura-common/src/main/java/cn/sliew/sakura/common/dict/CatalogFunctionLanguage.java
rename to sakura-common/src/main/java/cn/sliew/sakura/common/dict/catalog/flink/CatalogFunctionLanguage.java
index 84f71b9..4c90832 100644
--- a/sakura-common/src/main/java/cn/sliew/sakura/common/dict/CatalogFunctionLanguage.java
+++ b/sakura-common/src/main/java/cn/sliew/sakura/common/dict/catalog/flink/CatalogFunctionLanguage.java
@@ -16,11 +16,11 @@
* limitations under the License.
*/
-package cn.sliew.sakura.common.dict;
+package cn.sliew.sakura.common.dict.catalog.flink;
import com.baomidou.mybatisplus.annotation.EnumValue;
+import com.fasterxml.jackson.annotation.JsonValue;
import lombok.Getter;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
@Getter
public enum CatalogFunctionLanguage {
diff --git a/sakura-common/src/main/java/cn/sliew/sakura/common/dict/CatalogTableKind.java b/sakura-common/src/main/java/cn/sliew/sakura/common/dict/catalog/flink/CatalogTableKind.java
similarity index 90%
rename from sakura-common/src/main/java/cn/sliew/sakura/common/dict/CatalogTableKind.java
rename to sakura-common/src/main/java/cn/sliew/sakura/common/dict/catalog/flink/CatalogTableKind.java
index 78a2c94..470dfcb 100644
--- a/sakura-common/src/main/java/cn/sliew/sakura/common/dict/CatalogTableKind.java
+++ b/sakura-common/src/main/java/cn/sliew/sakura/common/dict/catalog/flink/CatalogTableKind.java
@@ -16,11 +16,11 @@
* limitations under the License.
*/
-package cn.sliew.sakura.common.dict;
+package cn.sliew.sakura.common.dict.catalog.flink;
import com.baomidou.mybatisplus.annotation.EnumValue;
+import com.fasterxml.jackson.annotation.JsonValue;
import lombok.Getter;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
@Getter
public enum CatalogTableKind {
diff --git a/sakura-common/src/main/java/cn/sliew/sakura/common/exception/Rethrower.java b/sakura-common/src/main/java/cn/sliew/sakura/common/exception/Rethrower.java
index 042d04c..fe48dd2 100644
--- a/sakura-common/src/main/java/cn/sliew/sakura/common/exception/Rethrower.java
+++ b/sakura-common/src/main/java/cn/sliew/sakura/common/exception/Rethrower.java
@@ -18,18 +18,14 @@
package cn.sliew.sakura.common.exception;
-import lombok.extern.slf4j.Slf4j;
-
import java.util.concurrent.Callable;
import java.util.function.Function;
-import static org.apache.flink.shaded.guava31.com.google.common.base.Preconditions.checkNotNull;
-
+import static com.google.common.base.Preconditions.checkNotNull;
/**
* Rethrowing checked exceptions as unchecked ones. Eh, it is sometimes useful...
*/
-@Slf4j
public enum Rethrower {
;
@@ -129,8 +125,8 @@ private static void castCheckedToRuntime(final Procedure voidCallable, final Fun
public static void swallow(final Procedure procedure) {
try {
procedure.call();
- } catch (final Exception e) {
- log.error("Swallowed error.", e);
+ } catch (Exception unused) {
+
}
}
diff --git a/sakura-common/src/main/java/cn/sliew/sakura/common/util/JacksonUtil.java b/sakura-common/src/main/java/cn/sliew/sakura/common/util/JacksonUtil.java
deleted file mode 100644
index 20d77ce..0000000
--- a/sakura-common/src/main/java/cn/sliew/sakura/common/util/JacksonUtil.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package cn.sliew.sakura.common.util;
-
-import cn.sliew.sakura.common.exception.Rethrower;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.type.CollectionType;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * jackson utility class.
- */
-@Slf4j
-public class JacksonUtil {
-
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
- static {
- OBJECT_MAPPER.registerModule(new JavaTimeModule());
- OBJECT_MAPPER.registerModule(new Jdk8Module());
- OBJECT_MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
- OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- }
-
- private JacksonUtil() {
- throw new AssertionError("No instances intended");
- }
-
- /**
- * serialize object to json string.
- */
- public static String toJsonString(Object object) {
- try {
- return OBJECT_MAPPER.writeValueAsString(object);
- } catch (JsonProcessingException e) {
- log.error("json failed when serializing object: {}", object, e);
- Rethrower.throwAs(e);
- return null;
- }
- }
-
- /**
- * deserialize json string to target specified by {@link Class}.
- */
- public static T parseJsonString(String json, Class clazz) {
- try {
- return OBJECT_MAPPER.readValue(json, clazz);
- } catch (JsonProcessingException e) {
- log.error("json failed when deserializing clazz: {}, json: {}", clazz.getName(), json, e);
- Rethrower.throwAs(e);
- return null;
- }
- }
-
- /**
- * deserialize json string to target specified by {@link TypeReference}.
- * {@link TypeReference} indicate type generics.
- */
- public static T parseJsonString(String json, TypeReference reference) {
- try {
- return OBJECT_MAPPER.readValue(json, reference);
- } catch (JsonProcessingException e) {
- log.error("json failed when deserializing clazz: {}, json: {}", reference.getType().getTypeName(), json, e);
- Rethrower.throwAs(e);
- return null;
- }
- }
-
- /**
- * deserialize json string to target specified by generic type.
- */
- public static T parseJsonString(String json, Class outerType, Class parameterClasses) {
- try {
- JavaType type = OBJECT_MAPPER.getTypeFactory().constructParametricType(outerType, parameterClasses);
- return OBJECT_MAPPER.readValue(json, type);
- } catch (JsonProcessingException e) {
- log.error("json failed when deserializing clazz: {}, json: {}", outerType.getTypeName(), json, e);
- Rethrower.throwAs(e);
- return null;
- }
- }
-
- public static List parseJsonArray(String json, Class clazz) {
- if (StringUtils.isBlank(json)) {
- return Collections.emptyList();
- }
-
- try {
- CollectionType listType = OBJECT_MAPPER.getTypeFactory().constructCollectionType(ArrayList.class, clazz);
- return OBJECT_MAPPER.readValue(json, listType);
- } catch (Exception e) {
- log.error("json failed when deserializing clazz: {}, json: {}", clazz.getName(), json, e);
- }
-
- return Collections.emptyList();
- }
-
- public static ArrayNode createArrayNode() {
- return OBJECT_MAPPER.createArrayNode();
- }
-
- public static ObjectNode createObjectNode() {
- return OBJECT_MAPPER.createObjectNode();
- }
-
- public static JsonNode toJsonNode(Object obj) {
- return OBJECT_MAPPER.valueToTree(obj);
- }
-
- public static JsonNode toJsonNode(String json) {
- try {
- return OBJECT_MAPPER.readTree(json);
- } catch (JsonProcessingException e) {
- Rethrower.throwAs(e);
- return null;
- }
- }
-
- public static T toObject(JsonNode jsonNode, Class clazz) {
- return OBJECT_MAPPER.convertValue(jsonNode, clazz);
- }
-
- public static T toObject(JsonNode jsonNode, TypeReference typeReference) {
- return OBJECT_MAPPER.convertValue(jsonNode, typeReference);
- }
-
- public static boolean checkJsonValid(String json) {
- if (StringUtils.isBlank(json)) {
- return false;
- }
-
- try {
- OBJECT_MAPPER.readTree(json);
- return true;
- } catch (IOException ignored) {
- // just ignore
- }
-
- return false;
- }
-
- public static T deepCopy(S source, Class clazz) {
- try {
- JsonNode jsonNode = OBJECT_MAPPER.valueToTree(source);
- return OBJECT_MAPPER.treeToValue(jsonNode, clazz);
- } catch (JsonProcessingException e) {
- log.error("property deep copy failed! source: {}, target: {}", source, clazz.getName(), e);
- Rethrower.throwAs(e);
- return null;
- }
- }
-}
diff --git a/sakura-dao/src/main/java/cn/sliew/sakura/dao/entity/BaseDO.java b/sakura-dao/src/main/java/cn/sliew/sakura/dao/entity/BaseDO.java
index df2e38b..94d3de0 100644
--- a/sakura-dao/src/main/java/cn/sliew/sakura/dao/entity/BaseDO.java
+++ b/sakura-dao/src/main/java/cn/sliew/sakura/dao/entity/BaseDO.java
@@ -40,4 +40,7 @@ public class BaseDO implements Serializable {
@TableField(value = "update_time", fill = FieldFill.INSERT_UPDATE)
private Date updateTime;
+
+ @TableField("delete_time")
+ private Date deleteTime;
}
diff --git a/sakura-dao/src/main/java/cn/sliew/sakura/dao/entity/CatalogDatabase.java b/sakura-dao/src/main/java/cn/sliew/sakura/dao/entity/CatalogDatabase.java
index 7960999..dd07dda 100644
--- a/sakura-dao/src/main/java/cn/sliew/sakura/dao/entity/CatalogDatabase.java
+++ b/sakura-dao/src/main/java/cn/sliew/sakura/dao/entity/CatalogDatabase.java
@@ -18,6 +18,7 @@
package cn.sliew.sakura.dao.entity;
+import cn.sliew.sakura.common.dict.catalog.CatalogType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
@@ -28,6 +29,9 @@ public class CatalogDatabase extends BaseDO {
private static final long serialVersionUID = 1L;
+ @TableField("`type`")
+ private CatalogType type;
+
@TableField("catalog")
private String catalog;
diff --git a/sakura-dao/src/main/java/cn/sliew/sakura/dao/entity/CatalogFunction.java b/sakura-dao/src/main/java/cn/sliew/sakura/dao/entity/CatalogFunction.java
index 01fb217..5919e42 100644
--- a/sakura-dao/src/main/java/cn/sliew/sakura/dao/entity/CatalogFunction.java
+++ b/sakura-dao/src/main/java/cn/sliew/sakura/dao/entity/CatalogFunction.java
@@ -18,7 +18,7 @@
package cn.sliew.sakura.dao.entity;
-import cn.sliew.sakura.common.dict.CatalogFunctionLanguage;
+import cn.sliew.sakura.common.dict.catalog.flink.CatalogFunctionLanguage;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
diff --git a/sakura-dao/src/main/java/cn/sliew/sakura/dao/entity/CatalogStore.java b/sakura-dao/src/main/java/cn/sliew/sakura/dao/entity/CatalogStore.java
index d0d41bc..e382085 100644
--- a/sakura-dao/src/main/java/cn/sliew/sakura/dao/entity/CatalogStore.java
+++ b/sakura-dao/src/main/java/cn/sliew/sakura/dao/entity/CatalogStore.java
@@ -18,6 +18,7 @@
package cn.sliew.sakura.dao.entity;
+import cn.sliew.sakura.common.dict.catalog.CatalogType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
@@ -28,6 +29,9 @@ public class CatalogStore extends BaseDO {
private static final long serialVersionUID = 1L;
+ @TableField("`type`")
+ private CatalogType type;
+
@TableField("`catalog_name`")
private String catalogName;
diff --git a/sakura-dao/src/main/java/cn/sliew/sakura/dao/entity/CatalogTable.java b/sakura-dao/src/main/java/cn/sliew/sakura/dao/entity/CatalogTable.java
index f23e567..0e1f3ad 100644
--- a/sakura-dao/src/main/java/cn/sliew/sakura/dao/entity/CatalogTable.java
+++ b/sakura-dao/src/main/java/cn/sliew/sakura/dao/entity/CatalogTable.java
@@ -18,7 +18,7 @@
package cn.sliew.sakura.dao.entity;
-import cn.sliew.sakura.common.dict.CatalogTableKind;
+import cn.sliew.sakura.common.dict.catalog.flink.CatalogTableKind;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
diff --git a/sakura-dao/src/main/java/cn/sliew/sakura/dao/mapper/CatalogFunctionMapper.java b/sakura-dao/src/main/java/cn/sliew/sakura/dao/mapper/CatalogFunctionMapper.java
index 9449075..cf9b892 100644
--- a/sakura-dao/src/main/java/cn/sliew/sakura/dao/mapper/CatalogFunctionMapper.java
+++ b/sakura-dao/src/main/java/cn/sliew/sakura/dao/mapper/CatalogFunctionMapper.java
@@ -18,6 +18,7 @@
package cn.sliew.sakura.dao.mapper;
+import cn.sliew.sakura.common.dict.catalog.CatalogType;
import cn.sliew.sakura.dao.entity.CatalogFunction;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
@@ -29,11 +30,11 @@
@Mapper
public interface CatalogFunctionMapper extends BaseMapper {
- List selectByDatabase(@Param("catalog") String catalog, @Param("database") String database);
+ List selectByDatabase(@Param("type") CatalogType type, @Param("catalog") String catalog, @Param("database") String database);
- int countByDatabase(@Param("catalog") String catalog, @Param("database") String database);
+ int countByDatabase(@Param("type") CatalogType type, @Param("catalog") String catalog, @Param("database") String database);
- Optional selectByName(@Param("catalog") String catalog, @Param("database") String database, @Param("name") String name);
+ Optional selectByName(@Param("type") CatalogType type, @Param("catalog") String catalog, @Param("database") String database, @Param("name") String name);
- int deleteByName(@Param("catalog") String catalog, @Param("database") String database, @Param("name") String name);
+ int deleteByName(@Param("type") CatalogType type, @Param("catalog") String catalog, @Param("database") String database, @Param("name") String name);
}
diff --git a/sakura-dao/src/main/java/cn/sliew/sakura/dao/mapper/CatalogTableMapper.java b/sakura-dao/src/main/java/cn/sliew/sakura/dao/mapper/CatalogTableMapper.java
index b5c4530..5558755 100644
--- a/sakura-dao/src/main/java/cn/sliew/sakura/dao/mapper/CatalogTableMapper.java
+++ b/sakura-dao/src/main/java/cn/sliew/sakura/dao/mapper/CatalogTableMapper.java
@@ -18,7 +18,8 @@
package cn.sliew.sakura.dao.mapper;
-import cn.sliew.sakura.common.dict.CatalogTableKind;
+import cn.sliew.sakura.common.dict.catalog.CatalogType;
+import cn.sliew.sakura.common.dict.catalog.flink.CatalogTableKind;
import cn.sliew.sakura.dao.entity.CatalogTable;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
@@ -30,11 +31,11 @@
@Mapper
public interface CatalogTableMapper extends BaseMapper {
- List selectByDatabase(@Param("catalog") String catalog, @Param("database") String database, @Param("kind") CatalogTableKind kind);
+ List selectByDatabase(@Param("type") CatalogType type, @Param("catalog") String catalog, @Param("database") String database, @Param("kind") CatalogTableKind kind);
- int countByDatabase(@Param("catalog") String catalog, @Param("database") String database, @Param("kind") CatalogTableKind kind);
+ int countByDatabase(@Param("type") CatalogType type, @Param("catalog") String catalog, @Param("database") String database, @Param("kind") CatalogTableKind kind);
- Optional selectByName(@Param("catalog") String catalog, @Param("database") String database, @Param("kind") CatalogTableKind kind, @Param("name") String name);
+ Optional selectByName(@Param("type") CatalogType type, @Param("catalog") String catalog, @Param("database") String database, @Param("kind") CatalogTableKind kind, @Param("name") String name);
- int deleteByName(@Param("catalog") String catalog, @Param("database") String database, @Param("kind") CatalogTableKind kind, @Param("name") String name);
+ int deleteByName(@Param("type") CatalogType type, @Param("catalog") String catalog, @Param("database") String database, @Param("kind") CatalogTableKind kind, @Param("name") String name);
}
diff --git a/sakura-dao/src/main/resources/cn/sliew/sakura/dao/mapper/CatalogDatabaseMapper.xml b/sakura-dao/src/main/resources/cn/sliew/sakura/dao/mapper/CatalogDatabaseMapper.xml
index 607df99..ec3831a 100644
--- a/sakura-dao/src/main/resources/cn/sliew/sakura/dao/mapper/CatalogDatabaseMapper.xml
+++ b/sakura-dao/src/main/resources/cn/sliew/sakura/dao/mapper/CatalogDatabaseMapper.xml
@@ -23,6 +23,8 @@
+
+
@@ -30,8 +32,8 @@
- id, create_time, update_time,
- catalog, `name`, properties, remark
+ id, create_time, update_time, delete_time,
+ `type`, catalog, `name`, properties, remark
diff --git a/sakura-dao/src/main/resources/cn/sliew/sakura/dao/mapper/CatalogFunctionMapper.xml b/sakura-dao/src/main/resources/cn/sliew/sakura/dao/mapper/CatalogFunctionMapper.xml
index 7a7ae1e..f19521b 100644
--- a/sakura-dao/src/main/resources/cn/sliew/sakura/dao/mapper/CatalogFunctionMapper.xml
+++ b/sakura-dao/src/main/resources/cn/sliew/sakura/dao/mapper/CatalogFunctionMapper.xml
@@ -23,6 +23,7 @@
+
@@ -31,7 +32,7 @@
- id,create_time,update_time,
+ id, create_time, update_time, delete_time,
database_id, `name`, class_name, function_language, remark
@@ -40,7 +41,8 @@
f.*
FROM catalog_function f
JOIN catalog_database d ON f.database_id = d.id
- WHERE d.catalog = #{catalog}
+ WHERE d.type = #{type}
+ AND d.catalog = #{catalog}
AND d.`name` = #{database}
ORDER BY f.`name`, f.create_time
@@ -49,7 +51,8 @@
SELECT COUNT(*)
FROM catalog_function f
JOIN catalog_database d ON f.database_id = d.id
- WHERE d.catalog = #{catalog}
+ WHERE d.type = #{type}
+ AND d.catalog = #{catalog}
AND d.`name` = #{database}
@@ -58,20 +61,22 @@
f.*
FROM catalog_function f
JOIN catalog_database d ON f.database_id = d.id
- WHERE d.catalog = #{catalog}
+ WHERE d.type = #{type}
+ AND d.catalog = #{catalog}
AND d.`name` = #{database}
AND f.`name` = #{name}
-
- DELETE
- FROM catalog_function
+
+ UPDATE catalog_function
+ SET delete_time = NOW()
WHERE id = (SELECT ct.id
FROM (SELECT f.id
FROM catalog_function f
JOIN catalog_database d ON f.database_id = d.id
- WHERE d.catalog = #{catalog}
+ WHERE d.type = #{type}
+ AND d.catalog = #{catalog}
AND d.`name` = #{database}
AND f.`name` = #{name}) AS ct)
-
+
diff --git a/sakura-dao/src/main/resources/cn/sliew/sakura/dao/mapper/CatalogStoreMapper.xml b/sakura-dao/src/main/resources/cn/sliew/sakura/dao/mapper/CatalogStoreMapper.xml
index edd8fae..761397a 100644
--- a/sakura-dao/src/main/resources/cn/sliew/sakura/dao/mapper/CatalogStoreMapper.xml
+++ b/sakura-dao/src/main/resources/cn/sliew/sakura/dao/mapper/CatalogStoreMapper.xml
@@ -23,13 +23,15 @@
+
+
- id, create_time, update_time,
- `catalog_name`, configuration
+ id, create_time, update_time, delete_time,
+ `type`, `catalog_name`, configuration
diff --git a/sakura-dao/src/main/resources/cn/sliew/sakura/dao/mapper/CatalogTableMapper.xml b/sakura-dao/src/main/resources/cn/sliew/sakura/dao/mapper/CatalogTableMapper.xml
index 14160e5..305bdf0 100644
--- a/sakura-dao/src/main/resources/cn/sliew/sakura/dao/mapper/CatalogTableMapper.xml
+++ b/sakura-dao/src/main/resources/cn/sliew/sakura/dao/mapper/CatalogTableMapper.xml
@@ -23,6 +23,7 @@
+
@@ -35,7 +36,7 @@
- id, create_time, update_time,
+ id, create_time, update_time, delete_time,
database_id, kind, `name`, properties, `schema`, original_query, expanded_query, remark
@@ -46,7 +47,8 @@
catalog_table t
JOIN catalog_database d ON t.database_id = d.id
WHERE
- d.catalog = #{catalog}
+ d.type = #{type}
+ AND d.catalog = #{catalog}
AND d.`name` = #{database}
AND t.kind = #{kind}
ORDER BY t.`name`, t.create_time
@@ -57,7 +59,8 @@
FROM catalog_table t
JOIN catalog_database d ON t.database_id = d.id
WHERE
- d.catalog = #{catalog}
+ d.type = #{type}
+ AND d.catalog = #{catalog}
AND d.`name` = #{database}
AND t.kind = #{kind}
@@ -68,23 +71,25 @@
FROM catalog_table t
JOIN catalog_database d ON t.database_id = d.id
WHERE
- d.catalog = #{catalog}
+ d.type = #{type}
+ AND d.catalog = #{catalog}
AND d.`name` = #{database}
AND t.kind = #{kind}
AND t.`name` = #{name}
-
- DELETE
- FROM catalog_table
+
+ UPDATE catalog_table
+ SET delete_time = NOW()
WHERE id = (SELECT ct.id
FROM (SELECT t.id
FROM catalog_table t
JOIN catalog_database d ON t.database_id = d.id
- WHERE d.catalog = #{catalog}
+ WHERE d.type = #{type}
+ AND d.catalog = #{catalog}
AND d.`name` = #{database}
AND t.kind = #{kind}
AND t.`name` = #{name}) AS ct
)
-
+
diff --git a/tools/docker/mysql/init.d/sakura-mysq.sql b/tools/docker/mysql/init.d/sakura-mysq.sql
index 4b7738a..2ade76d 100644
--- a/tools/docker/mysql/init.d/sakura-mysq.sql
+++ b/tools/docker/mysql/init.d/sakura-mysq.sql
@@ -5,26 +5,30 @@ DROP TABLE IF EXISTS `catalog_store`;
CREATE TABLE `catalog_store`
(
id BIGINT NOT NULL AUTO_INCREMENT,
+ type VARCHAR(8) NOT NULL,
catalog_name VARCHAR(256) NOT NULL,
configuration TEXT,
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+ delete_time DATETIME,
PRIMARY KEY (id),
- UNIQUE KEY uniq_catalog (catalog_name)
+ UNIQUE KEY uniq_catalog (type, catalog_name, delete_time)
) ENGINE = InnoDB COMMENT ='catalog';
DROP TABLE IF EXISTS `catalog_database`;
CREATE TABLE `catalog_database`
(
id BIGINT NOT NULL AUTO_INCREMENT,
+ type VARCHAR(8) NOT NULL,
catalog VARCHAR(256) NOT NULL,
`name` VARCHAR(256) NOT NULL,
properties TEXT,
remark VARCHAR(256),
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+ delete_time DATETIME,
PRIMARY KEY (id),
- UNIQUE KEY uniq_name (catalog, `name`)
+ UNIQUE KEY uniq_name (type, catalog, `name`, delete_time)
) ENGINE = InnoDB COMMENT ='database';
DROP TABLE IF EXISTS `catalog_table`;
@@ -41,8 +45,9 @@ CREATE TABLE `catalog_table`
remark VARCHAR(256),
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+ delete_time DATETIME,
PRIMARY KEY (id),
- UNIQUE KEY uniq_name (database_id, kind, `name`)
+ UNIQUE KEY uniq_name (database_id, kind, `name`, delete_time)
) ENGINE = InnoDB COMMENT ='table';
DROP TABLE IF EXISTS catalog_function;
@@ -56,6 +61,7 @@ CREATE TABLE `catalog_function`
remark VARCHAR(256),
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+ delete_time DATETIME,
PRIMARY KEY (id),
- UNIQUE KEY uniq_name (database_id, `name`)
+ UNIQUE KEY uniq_name (database_id, `name`, delete_time)
) ENGINE = InnoDB COMMENT ='function';