Skip to content

Commit

Permalink
feature: update flink catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
wangqi committed Mar 7, 2024
1 parent cf6fbae commit f910ad8
Show file tree
Hide file tree
Showing 42 changed files with 313 additions and 401 deletions.
15 changes: 4 additions & 11 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@

<flink.version>1.18.1</flink.version>
<flink.scope>provided</flink.scope>
<flink.shaded.jackson.version>2.15.3-18.0</flink.shaded.jackson.version>
<flink.shaded.guava.version>32.1.3-jre-18.0</flink.shaded.guava.version>
<guava.version>32.1.3-jre</guava.version>
<org.projectlombok.version>1.18.30</org.projectlombok.version>
<milky.version>1.0.12</milky.version>
<mybatis.plus.version>3.5.4</mybatis.plus.version>
Expand Down Expand Up @@ -145,16 +145,9 @@
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson</artifactId>
<version>${flink.shaded.jackson.version}</version>
<scope>${flink.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<version>${flink.shaded.guava.version}</version>
<scope>${flink.scope}</scope>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import cn.sliew.sakura.catalog.service.dto.CatalogFunctionDTO;
import cn.sliew.sakura.catalog.service.dto.CatalogTableDTO;
import cn.sliew.sakura.catalog.service.impl.CatalogServiceImpl;
import cn.sliew.sakura.common.dict.catalog.CatalogType;
import cn.sliew.sakura.dao.util.MybatisUtil;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.flink.table.catalog.*;
Expand Down Expand Up @@ -73,19 +74,19 @@ public void close() throws CatalogException {

@Override
public List<String> listDatabases() throws CatalogException {
return catalogService.listDatabases(getName()).stream().map(CatalogDatabaseDTO::getName).collect(Collectors.toList());
return catalogService.listDatabases(CatalogType.FLINK, getName()).stream().map(CatalogDatabaseDTO::getName).collect(Collectors.toList());
}

@Override
public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
Optional<CatalogDatabaseDTO> optional = catalogService.getDatabase(getName(), databaseName);
Optional<CatalogDatabaseDTO> optional = catalogService.getDatabase(CatalogType.FLINK, getName(), databaseName);
return optional.map(CatalogDatabaseFactory::toDatabase)
.orElseThrow(() -> new DatabaseNotExistException(getName(), databaseName));
}

@Override
public boolean databaseExists(String databaseName) throws CatalogException {
return catalogService.databaseExists(getName(), databaseName);
return catalogService.databaseExists(CatalogType.FLINK, getName(), databaseName);
}

@Override
Expand All @@ -102,7 +103,7 @@ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade
}
}

if (!cascade && !catalogService.isDatabaseEmpty(getName(), name)) {
if (!cascade && !catalogService.isDatabaseEmpty(CatalogType.FLINK, getName(), name)) {
throw new DatabaseNotEmptyException(getName(), name);
}

Expand All @@ -113,7 +114,7 @@ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade
} catch (TableNotExistException ignored) {
}
});
catalogService.deleteDatabase(getName(), name);
catalogService.deleteDatabase(CatalogType.FLINK, getName(), name);
} catch (DatabaseNotExistException e) {
if (!ignoreIfNotExists) {
throw e;
Expand All @@ -132,32 +133,32 @@ public List<String> listTables(String databaseName) throws DatabaseNotExistExcep
if (databaseExists(databaseName) == false) {
throw new DatabaseNotExistException(getName(), databaseName);
}
return catalogService.listTables(getName(), databaseName).stream().map(CatalogTableDTO::getName).collect(Collectors.toList());
return catalogService.listTables(CatalogType.FLINK, getName(), databaseName).stream().map(CatalogTableDTO::getName).collect(Collectors.toList());
}

@Override
public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
if (databaseExists(databaseName) == false) {
throw new DatabaseNotExistException(getName(), databaseName);
}
return catalogService.listViews(getName(), databaseName).stream().map(CatalogTableDTO::getName).collect(Collectors.toList());
return catalogService.listViews(CatalogType.FLINK, getName(), databaseName).stream().map(CatalogTableDTO::getName).collect(Collectors.toList());
}

@Override
public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
String database = tablePath.getDatabaseName();
String table = tablePath.getObjectName();
Optional<CatalogBaseTable> tableOptional = catalogService.getTable(getName(), database, table).map(CatalogTableFactory::toTable);
Optional<CatalogBaseTable> viewOptional = catalogService.getView(getName(), database, table).map(CatalogViewFactory::toView);
Optional<CatalogBaseTable> tableOptional = catalogService.getTable(CatalogType.FLINK, getName(), database, table).map(CatalogTableFactory::toTable);
Optional<CatalogBaseTable> viewOptional = catalogService.getView(CatalogType.FLINK, getName(), database, table).map(CatalogViewFactory::toView);
return tableOptional.or(() -> viewOptional).orElseThrow(() -> new TableNotExistException(getName(), tablePath));
}

@Override
public boolean tableExists(ObjectPath tablePath) throws CatalogException {
String database = tablePath.getDatabaseName();
String table = tablePath.getObjectName();
boolean tableExists = catalogService.tableExists(getName(), database, table);
boolean viewExists = catalogService.viewExists(getName(), database, table);
boolean tableExists = catalogService.tableExists(CatalogType.FLINK, getName(), database, table);
boolean viewExists = catalogService.viewExists(CatalogType.FLINK, getName(), database, table);
return tableExists || viewExists;
}

Expand All @@ -170,10 +171,10 @@ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws Ta
CatalogBaseTable object = getTable(tablePath);
switch (object.getTableKind()) {
case TABLE:
catalogService.deleteTable(getName(), database, table);
catalogService.deleteTable(CatalogType.FLINK, getName(), database, table);
break;
case VIEW:
catalogService.deleteView(getName(), database, table);
catalogService.deleteView(CatalogType.FLINK, getName(), database, table);
break;
default:
throw new IllegalArgumentException("Unknown table type: " + object.getTableKind());
Expand All @@ -194,10 +195,10 @@ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignor
CatalogBaseTable object = getTable(tablePath);
switch (object.getTableKind()) {
case TABLE:
catalogService.renameTable(getName(), database, table, newTableName);
catalogService.renameTable(CatalogType.FLINK, getName(), database, table, newTableName);
break;
case VIEW:
catalogService.renameView(getName(), database, table, newTableName);
catalogService.renameView(CatalogType.FLINK, getName(), database, table, newTableName);
break;
default:
throw new IllegalArgumentException("Unknown table type: " + object.getTableKind());
Expand All @@ -218,7 +219,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable catalogBaseTable,
CatalogTable catalogTable = (CatalogTable) catalogBaseTable;
CatalogTableDTO catalogTableDTO = CatalogTableFactory.fromResolvedTable(table, catalogTable);
try {
catalogService.insertTable(getName(), database, catalogTableDTO);
catalogService.insertTable(CatalogType.FLINK, getName(), database, catalogTableDTO);
break;
} catch (TableAlreadyExistException e) {
if (!ignoreIfExists) {
Expand All @@ -229,7 +230,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable catalogBaseTable,
ResolvedCatalogView catalogView = (ResolvedCatalogView) catalogBaseTable;
CatalogTableDTO catalogViewDTO = CatalogViewFactory.fromResolvedView(table, catalogView);
try {
catalogService.insertView(getName(), database, catalogViewDTO);
catalogService.insertView(CatalogType.FLINK, getName(), database, catalogViewDTO);
} catch (TableAlreadyExistException e) {
if (!ignoreIfExists) {
throw e;
Expand All @@ -256,12 +257,12 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean
case TABLE:
CatalogTable catalogTable = (CatalogTable) newTable;
CatalogTableDTO catalogTableDTO = CatalogTableFactory.fromResolvedTable(table, catalogTable);
catalogService.updateTable(getName(), database, catalogTableDTO);
catalogService.updateTable(CatalogType.FLINK, getName(), database, catalogTableDTO);
break;
case VIEW:
ResolvedCatalogView catalogView = (ResolvedCatalogView) newTable;
CatalogTableDTO catalogViewDTO = CatalogViewFactory.fromResolvedView(table, catalogView);
catalogService.updateView(getName(), database, catalogViewDTO);
catalogService.updateView(CatalogType.FLINK, getName(), database, catalogViewDTO);
break;
default:
throw new IllegalArgumentException("Unknown table type: " + currentTable.getTableKind());
Expand Down Expand Up @@ -330,14 +331,14 @@ public List<String> listFunctions(String dbName) throws DatabaseNotExistExceptio
if (databaseExists(dbName) == false) {
throw new DatabaseNotExistException(getName(), dbName);
}
return catalogService.listFunctions(getName(), dbName).stream().map(CatalogFunctionDTO::getName).collect(Collectors.toList());
return catalogService.listFunctions(CatalogType.FLINK, getName(), dbName).stream().map(CatalogFunctionDTO::getName).collect(Collectors.toList());
}

@Override
public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
String database = functionPath.getDatabaseName();
String function = functionPath.getObjectName();
return catalogService.getFunction(getName(), database, function)
return catalogService.getFunction(CatalogType.FLINK, getName(), database, function)
.map(CatalogFunctionFactory::toCatalogFunction)
.orElseThrow(() -> new FunctionNotExistException(getName(), functionPath));
}
Expand All @@ -346,7 +347,7 @@ public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotEx
public boolean functionExists(ObjectPath functionPath) throws CatalogException {
String database = functionPath.getDatabaseName();
String function = functionPath.getObjectName();
return catalogService.functionExists(getName(), database, function);
return catalogService.functionExists(CatalogType.FLINK, getName(), database, function);
}

@Override
Expand All @@ -365,7 +366,7 @@ public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) thr
String function = functionPath.getObjectName();

try {
catalogService.deleteFunction(getName(), database, function);
catalogService.deleteFunction(CatalogType.FLINK, getName(), database, function);
} catch (FunctionNotExistException e) {
if (!ignoreIfNotExists) {
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package cn.sliew.sakura.catalog.factory;

import cn.sliew.sakura.catalog.service.dto.CatalogFunctionDTO;
import cn.sliew.sakura.common.dict.CatalogFunctionLanguage;
import cn.sliew.sakura.common.dict.catalog.flink.CatalogFunctionLanguage;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.FunctionLanguage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import cn.sliew.sakura.catalog.service.dto.SchemaDTO;
import cn.sliew.sakura.catalog.service.dto.UniqueConstraintDTO;
import cn.sliew.sakura.catalog.service.dto.WatermarkDTO;
import cn.sliew.sakura.common.dict.CatalogColumnType;
import cn.sliew.sakura.common.dict.CatalogConstraintType;
import cn.sliew.sakura.common.dict.catalog.flink.CatalogColumnType;
import cn.sliew.sakura.common.dict.catalog.flink.CatalogConstraintType;
import cn.sliew.sakura.common.exception.Rethrower;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import cn.sliew.sakura.catalog.SakuraCatalogTable;
import cn.sliew.sakura.catalog.service.dto.CatalogTableDTO;
import cn.sliew.sakura.common.dict.CatalogTableKind;
import cn.sliew.sakura.common.dict.catalog.flink.CatalogTableKind;
import org.apache.commons.lang3.EnumUtils;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogBaseTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import cn.sliew.sakura.catalog.SakuraCatalogView;
import cn.sliew.sakura.catalog.service.dto.CatalogTableDTO;
import cn.sliew.sakura.common.dict.CatalogTableKind;
import cn.sliew.sakura.common.dict.catalog.flink.CatalogTableKind;
import org.apache.commons.lang3.EnumUtils;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogBaseTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,64 +21,65 @@
import cn.sliew.sakura.catalog.service.dto.CatalogDatabaseDTO;
import cn.sliew.sakura.catalog.service.dto.CatalogFunctionDTO;
import cn.sliew.sakura.catalog.service.dto.CatalogTableDTO;
import cn.sliew.sakura.common.dict.catalog.CatalogType;
import org.apache.flink.table.catalog.exceptions.*;

import java.util.List;
import java.util.Optional;

public interface CatalogService {

List<CatalogDatabaseDTO> listDatabases(String catalog);
List<CatalogDatabaseDTO> listDatabases(CatalogType type, String catalog);

Optional<CatalogDatabaseDTO> getDatabase(String catalog, String database);
Optional<CatalogDatabaseDTO> getDatabase(CatalogType type, String catalog, String database);

boolean databaseExists(String catalog, String database);
boolean databaseExists(CatalogType type, String catalog, String database);

void insertDatabase(CatalogDatabaseDTO database) throws DatabaseAlreadyExistException;

void updateDatabase(CatalogDatabaseDTO database) throws DatabaseNotExistException;

void deleteDatabase(String catalog, String database) throws DatabaseNotExistException;
void deleteDatabase(CatalogType type, String catalog, String database) throws DatabaseNotExistException;

boolean isDatabaseEmpty(String catalog, String database);
boolean isDatabaseEmpty(CatalogType type, String catalog, String database);

List<CatalogTableDTO> listTables(String catalog, String database);
List<CatalogTableDTO> listTables(CatalogType type, String catalog, String database);

Optional<CatalogTableDTO> getTable(String catalog, String database, String table);
Optional<CatalogTableDTO> getTable(CatalogType type, String catalog, String database, String table);

boolean tableExists(String catalog, String database, String table);
boolean tableExists(CatalogType type, String catalog, String database, String table);

void insertTable(String catalog, String database, CatalogTableDTO table) throws DatabaseNotExistException, TableAlreadyExistException;
void insertTable(CatalogType type, String catalog, String database, CatalogTableDTO table) throws DatabaseNotExistException, TableAlreadyExistException;

void updateTable(String catalog, String database, CatalogTableDTO table) throws TableNotExistException;
void updateTable(CatalogType type, String catalog, String database, CatalogTableDTO table) throws TableNotExistException;

void renameTable(String catalog, String database, String currentName, String newName) throws TableAlreadyExistException, TableNotExistException;
void renameTable(CatalogType type, String catalog, String database, String currentName, String newName) throws TableAlreadyExistException, TableNotExistException;

void deleteTable(String catalog, String database, String table) throws TableNotExistException;
void deleteTable(CatalogType type, String catalog, String database, String table) throws TableNotExistException;

List<CatalogTableDTO> listViews(String catalog, String database);
List<CatalogTableDTO> listViews(CatalogType type, String catalog, String database);

Optional<CatalogTableDTO> getView(String catalog, String database, String view);
Optional<CatalogTableDTO> getView(CatalogType type, String catalog, String database, String view);

boolean viewExists(String catalog, String database, String view);
boolean viewExists(CatalogType type, String catalog, String database, String view);

void insertView(String catalog, String database, CatalogTableDTO view) throws DatabaseNotExistException, TableAlreadyExistException;
void insertView(CatalogType type, String catalog, String database, CatalogTableDTO view) throws DatabaseNotExistException, TableAlreadyExistException;

void updateView(String catalog, String database, CatalogTableDTO view) throws TableNotExistException;
void updateView(CatalogType type, String catalog, String database, CatalogTableDTO view) throws TableNotExistException;

void renameView(String catalog, String database, String currentName, String newName) throws TableNotExistException, TableAlreadyExistException;
void renameView(CatalogType type, String catalog, String database, String currentName, String newName) throws TableNotExistException, TableAlreadyExistException;

void deleteView(String catalog, String database, String viewName) throws TableNotExistException;
void deleteView(CatalogType type, String catalog, String database, String viewName) throws TableNotExistException;

List<CatalogFunctionDTO> listFunctions(String catalog, String database);
List<CatalogFunctionDTO> listFunctions(CatalogType type, String catalog, String database);

Optional<CatalogFunctionDTO> getFunction(String catalog, String database, String function);
Optional<CatalogFunctionDTO> getFunction(CatalogType type, String catalog, String database, String function);

boolean functionExists(String catalog, String database, String function);
boolean functionExists(CatalogType type, String catalog, String database, String function);

void insertFunction(String catalog, String database, CatalogFunctionDTO function) throws DatabaseNotExistException, FunctionAlreadyExistException;
void insertFunction(CatalogType type, String catalog, String database, CatalogFunctionDTO function) throws DatabaseNotExistException, FunctionAlreadyExistException;

void updateFunction(String catalog, String database, CatalogFunctionDTO function) throws FunctionNotExistException;
void updateFunction(CatalogType type, String catalog, String database, CatalogFunctionDTO function) throws FunctionNotExistException;

void deleteFunction(String catalog, String database, String functionName) throws FunctionNotExistException;
void deleteFunction(CatalogType type, String catalog, String database, String functionName) throws FunctionNotExistException;
}
Loading

0 comments on commit f910ad8

Please sign in to comment.