diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/CatalogStoreService.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/CatalogStoreService.java new file mode 100644 index 0000000..a3c76f3 --- /dev/null +++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/CatalogStoreService.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.sakura.catalog.service; + +import cn.sliew.sakura.catalog.service.dto.CatalogStoreDTO; + +import java.util.List; +import java.util.Optional; + +public interface CatalogStoreService { + + List list(); + + Optional get(String catalogName); + + void insert(CatalogStoreDTO dto); + + void delete(String catalogName); +} diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/convert/CatalogStoreConvert.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/convert/CatalogStoreConvert.java new file mode 100644 index 0000000..ef0e110 --- /dev/null +++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/convert/CatalogStoreConvert.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.sakura.catalog.service.convert; + +import cn.sliew.sakura.catalog.service.dto.CatalogStoreDTO; +import cn.sliew.sakura.common.exception.Rethrower; +import cn.sliew.sakura.common.util.CodecUtil; +import cn.sliew.sakura.common.util.JacksonUtil; +import cn.sliew.sakura.dao.entity.CatalogStore; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; + +import java.util.Map; + +public enum CatalogStoreConvert implements BaseConvert { + INSTANCE; + + @Override + public CatalogStore toDo(CatalogStoreDTO dto) { + try { + CatalogStore entity = new CatalogStore(); + Util.copyProperties(dto, entity); + entity.setCatalogName(dto.getCatalogName()); + if (dto.getConfiguration() != null) { + entity.setConfiguration(CodecUtil.decrypt(JacksonUtil.toJsonString(dto.getConfiguration()))); + } + return entity; + } catch (Exception e) { + Rethrower.throwAs(e); + return null; + } + } + + @Override + public CatalogStoreDTO toDto(CatalogStore entity) { + try { + CatalogStoreDTO dto = new CatalogStoreDTO(); + Util.copyProperties(entity, dto); + dto.setCatalogName(entity.getCatalogName()); + if (entity != null && StringUtils.isNotBlank(entity.getConfiguration())) { + Map configuration = JacksonUtil.parseJsonString(CodecUtil.decrypt(entity.getConfiguration()), new TypeReference>() { + }); + dto.setConfiguration(Configuration.fromMap(configuration)); + } + return dto; + } catch (Exception e) { + Rethrower.throwAs(e); + return null; + } + } +} diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/CatalogStoreDTO.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/CatalogStoreDTO.java new file mode 100644 index 0000000..9534bd5 --- /dev/null +++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/dto/CatalogStoreDTO.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.sakura.catalog.service.dto; + +import lombok.Data; +import org.apache.flink.configuration.Configuration; + +@Data +public class CatalogStoreDTO extends BaseDTO { + + private String catalogName; + private Configuration configuration; +} diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/impl/CatalogStoreServiceImpl.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/impl/CatalogStoreServiceImpl.java new file mode 100644 index 0000000..c0173f6 --- /dev/null +++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/service/impl/CatalogStoreServiceImpl.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.sakura.catalog.service.impl; + +import cn.sliew.sakura.catalog.service.CatalogStoreService; +import cn.sliew.sakura.catalog.service.convert.CatalogStoreConvert; +import cn.sliew.sakura.catalog.service.dto.CatalogStoreDTO; +import cn.sliew.sakura.dao.entity.CatalogStore; +import cn.sliew.sakura.dao.mapper.CatalogStoreMapper; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; +import com.baomidou.mybatisplus.core.toolkit.Wrappers; +import org.apache.ibatis.session.SqlSession; +import org.apache.ibatis.session.SqlSessionFactory; + +import java.util.List; +import java.util.Optional; + +public class CatalogStoreServiceImpl implements CatalogStoreService { + + private final SqlSessionFactory sqlSessionFactory; + + public CatalogStoreServiceImpl(SqlSessionFactory sqlSessionFactory) { + this.sqlSessionFactory = sqlSessionFactory; + } + + @Override + public List list() { + try (SqlSession sqlSession = sqlSessionFactory.openSession()) { + CatalogStoreMapper catalogStoreMapper = sqlSession.getMapper(CatalogStoreMapper.class); + LambdaQueryWrapper queryWrapper = Wrappers.lambdaQuery(CatalogStore.class) + .orderByAsc(CatalogStore::getCatalogName); + List catalogs = catalogStoreMapper.selectList(queryWrapper); + return CatalogStoreConvert.INSTANCE.toDto(catalogs); + } + } + + @Override + public Optional get(String catalogName) { + try (SqlSession sqlSession = sqlSessionFactory.openSession()) { + CatalogStoreMapper catalogStoreMapper = sqlSession.getMapper(CatalogStoreMapper.class); + LambdaQueryWrapper queryWrapper = Wrappers.lambdaQuery(CatalogStore.class) + .eq(CatalogStore::getCatalogName, catalogName); + CatalogStore catalog = catalogStoreMapper.selectOne(queryWrapper); + return Optional.ofNullable(catalog).map(CatalogStoreConvert.INSTANCE::toDto); + } + } + + @Override + public void insert(CatalogStoreDTO dto) { + try (SqlSession sqlSession = sqlSessionFactory.openSession()) { + CatalogStoreMapper catalogStoreMapper = sqlSession.getMapper(CatalogStoreMapper.class); + CatalogStore record = CatalogStoreConvert.INSTANCE.toDo(dto); + catalogStoreMapper.insert(record); + sqlSession.commit(); + } + } + + @Override + public void delete(String catalogName) { + try (SqlSession sqlSession = sqlSessionFactory.openSession()) { + CatalogStoreMapper catalogStoreMapper = sqlSession.getMapper(CatalogStoreMapper.class); + LambdaUpdateWrapper updateWrapper = Wrappers.lambdaUpdate(CatalogStore.class) + .eq(CatalogStore::getCatalogName, catalogName); + catalogStoreMapper.delete(updateWrapper); + sqlSession.commit(); + } + } +} diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/store/JdbcCatalogStore.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/store/JdbcCatalogStore.java new file mode 100644 index 0000000..bb594e4 --- /dev/null +++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/store/JdbcCatalogStore.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.sakura.catalog.store; + +import cn.sliew.sakura.catalog.service.CatalogStoreService; +import cn.sliew.sakura.catalog.service.dto.CatalogStoreDTO; +import cn.sliew.sakura.catalog.service.impl.CatalogStoreServiceImpl; +import org.apache.flink.table.catalog.AbstractCatalogStore; +import org.apache.flink.table.catalog.CatalogDescriptor; +import cn.sliew.sakura.dao.util.MybatisUtil; +import com.zaxxer.hikari.HikariDataSource; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.ibatis.session.SqlSessionFactory; + +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +public class JdbcCatalogStore extends AbstractCatalogStore { + + private final String driver; + private final String jdbcUrl; + private final String username; + private final String password; + private HikariDataSource dataSource; + private CatalogStoreService catalogStoreService; + + public JdbcCatalogStore(String driver, String jdbcUrl, String username, String password) { + this.driver = driver; + this.jdbcUrl = jdbcUrl; + this.username = username; + this.password = password; + } + + @Override + public void open() { + super.open(); + this.dataSource = MybatisUtil.createDataSource(driver, jdbcUrl, username, password); + SqlSessionFactory sqlSessionFactory = MybatisUtil.getSqlSessionFactory(dataSource); + this.catalogStoreService = new CatalogStoreServiceImpl(sqlSessionFactory); + } + + @Override + public void close() { + super.close(); + if (dataSource != null && dataSource.isClosed() == false) { + dataSource.close(); + } + } + + @Override + public void storeCatalog(String catalogName, CatalogDescriptor catalog) throws CatalogException { + if (contains(catalogName)) { + throw new CatalogException(String.format("Catalog %s's store is already exist.", catalogName)); + } + CatalogStoreDTO dto = new CatalogStoreDTO(); + dto.setCatalogName(catalog.getCatalogName()); + dto.setConfiguration(catalog.getConfiguration()); + catalogStoreService.insert(dto); + } + + @Override + public void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws CatalogException { + if (contains(catalogName) == false && ignoreIfNotExists == false) { + throw new CatalogException(String.format("Catalog %s's store is not exist", catalogName)); + } + catalogStoreService.delete(catalogName); + } + + @Override + public Optional getCatalog(String catalogName) throws CatalogException { + return catalogStoreService.get(catalogName).map(dto -> CatalogDescriptor.of(dto.getCatalogName(), dto.getConfiguration())); + } + + @Override + public Set listCatalogs() throws CatalogException { + return catalogStoreService.list().stream().map(CatalogStoreDTO::getCatalogName).collect(Collectors.toSet()); + } + + @Override + public boolean contains(String catalogName) throws CatalogException { + return getCatalog(catalogName).isPresent(); + } +} diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/store/JdbcCatalogStoreFactory.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/store/JdbcCatalogStoreFactory.java new file mode 100644 index 0000000..5e56428 --- /dev/null +++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/store/JdbcCatalogStoreFactory.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.sakura.catalog.store; + +import org.apache.flink.table.catalog.CatalogStore; +import org.apache.flink.table.factories.CatalogStoreFactory; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.factories.FactoryUtil; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import static cn.sliew.sakura.catalog.store.JdbcCatalogStoreOptions.*; + +public class JdbcCatalogStoreFactory implements CatalogStoreFactory { + + private String driver; + private String jdbcUrl; + private String username; + private String password; + + @Override + public CatalogStore createCatalogStore() { + return new JdbcCatalogStore(driver, jdbcUrl, username, password); + } + + @Override + public void open(Context context) throws CatalogException { + FactoryUtil.FactoryHelper factoryHelper = new CatalogStoreFactoryHelper(this, context); + factoryHelper.validate(); + + ReadableConfig options = factoryHelper.getOptions(); + this.driver = options.get(DRIVER); + this.jdbcUrl = options.get(JDBC_URL); + this.username = options.get(USERNAME); + this.password = options.get(PASSWORD); + } + + @Override + public void close() throws CatalogException { + + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + Set> options = new HashSet<>(); + options.add(DRIVER); + options.add(JDBC_URL); + options.add(USERNAME); + options.add(PASSWORD); + return options; + } + + @Override + public Set> optionalOptions() { + return Collections.emptySet(); + } + + public static class CatalogStoreFactoryHelper extends FactoryUtil.FactoryHelper { + public CatalogStoreFactoryHelper(CatalogStoreFactory catalogStoreFactory, CatalogStoreFactory.Context context) { + super(catalogStoreFactory, context.getOptions(), FactoryUtil.PROPERTY_VERSION); + } + } +} diff --git a/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/store/JdbcCatalogStoreOptions.java b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/store/JdbcCatalogStoreOptions.java new file mode 100644 index 0000000..24e60b8 --- /dev/null +++ b/sakura-catalog/src/main/java/cn/sliew/sakura/catalog/store/JdbcCatalogStoreOptions.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.sakura.catalog.store; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +public enum JdbcCatalogStoreOptions { + ; + + public static final String IDENTIFIER = "jdbc"; + + public static final ConfigOption DRIVER = + ConfigOptions.key("driver").stringType().noDefaultValue(); + + public static final ConfigOption JDBC_URL = + ConfigOptions.key("jdbcUrl").stringType().noDefaultValue(); + + public static final ConfigOption USERNAME = + ConfigOptions.key("username").stringType().noDefaultValue(); + + public static final ConfigOption PASSWORD = + ConfigOptions.key("password").stringType().noDefaultValue(); +} diff --git a/sakura-catalog/src/main/java/org/apache/flink/table/catalog/AbstractCatalogStore.java b/sakura-catalog/src/main/java/org/apache/flink/table/catalog/AbstractCatalogStore.java new file mode 100644 index 0000000..ab2a288 --- /dev/null +++ b/sakura-catalog/src/main/java/org/apache/flink/table/catalog/AbstractCatalogStore.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.util.Preconditions; + +/** The AbstractCatalogStore class is an abstract base class for implementing a catalog store. */ +public abstract class AbstractCatalogStore implements CatalogStore { + + /** Catalog store state. */ + protected boolean isOpen; + + /** Opens the catalog store. */ + @Override + public void open() { + isOpen = true; + } + + /** Closes the catalog store. */ + @Override + public void close() { + isOpen = false; + } + + /** + * Checks whether the catalog store is currently open. + * + * @throws IllegalStateException if the store is closed + */ + protected void checkOpenState() { + Preconditions.checkState(isOpen, "CatalogStore is not opened yet."); + } +} diff --git a/sakura-catalog/src/main/java/org/apache/flink/table/catalog/CatalogDescriptor.java b/sakura-catalog/src/main/java/org/apache/flink/table/catalog/CatalogDescriptor.java new file mode 100644 index 0000000..a1bfb91 --- /dev/null +++ b/sakura-catalog/src/main/java/org/apache/flink/table/catalog/CatalogDescriptor.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.Configuration; + +/** + * Describes a {@link Catalog} with the catalog name and configuration. + * + *

A {@link CatalogDescriptor} is a template for creating a {@link Catalog} instance. It closely + * resembles the "CREATE CATALOG" SQL DDL statement, containing catalog name and catalog + * configuration. A {@link CatalogDescriptor} could be stored to {@link CatalogStore}. + * + *

This can be used to register a catalog in the Table API + */ +@PublicEvolving +public class CatalogDescriptor { + + /* Catalog name */ + private final String catalogName; + + /* The configuration used to discover and construct the catalog. */ + private final Configuration configuration; + + public String getCatalogName() { + return catalogName; + } + + public Configuration getConfiguration() { + return configuration; + } + + private CatalogDescriptor(String catalogName, Configuration configuration) { + this.catalogName = catalogName; + this.configuration = configuration; + } + + /** + * Creates an instance of this interface. + * + * @param catalogName the name of the catalog + * @param configuration the configuration of the catalog + */ + public static CatalogDescriptor of(String catalogName, Configuration configuration) { + return new CatalogDescriptor(catalogName, configuration); + } +} diff --git a/sakura-catalog/src/main/java/org/apache/flink/table/catalog/CatalogStore.java b/sakura-catalog/src/main/java/org/apache/flink/table/catalog/CatalogStore.java new file mode 100644 index 0000000..af35927 --- /dev/null +++ b/sakura-catalog/src/main/java/org/apache/flink/table/catalog/CatalogStore.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.exceptions.CatalogException; + +import java.util.Optional; +import java.util.Set; + +/** + * Represents the storage where persists all {@link Catalog}s. + * + *

All catalogs can be lazy initialized with the {@link CatalogStore}. + * + *

It can be used in {@code CatalogManager} to retrieve, save and remove catalog in {@link + * CatalogDescriptor} format at the external storage system. + */ +@PublicEvolving +public interface CatalogStore { + + /** + * Stores a catalog under the given catalog name. The catalog name must be unique. + * + * @param catalogName the given catalog name under which to store the given catalog + * @param catalog catalog descriptor to store + * @throws CatalogException throw when registration failed + */ + void storeCatalog(String catalogName, CatalogDescriptor catalog) throws CatalogException; + + /** + * Remove a catalog with the given catalog name. + * + * @param catalogName the given catalog name under which to remove the given catalog + * @param ignoreIfNotExists whether throw an exception when the catalog does not exist + * @throws CatalogException throw when the removal operation failed + */ + void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws CatalogException; + + /** + * Get a catalog by name. + * + * @param catalogName name of the catalog to retrieve + * @return the requested catalog or empty if the catalog does not exist + * @throws CatalogException in case of any runtime exception + */ + Optional getCatalog(String catalogName) throws CatalogException; + + /** + * Retrieves the names of all registered catalogs. + * + * @return the names of registered catalogs + * @throws CatalogException in case of any runtime exception + */ + Set listCatalogs() throws CatalogException; + + /** + * Return whether the catalog exists in the catalog store. + * + * @param catalogName the name of catalog + * @throws CatalogException in case of any runtime exception + */ + boolean contains(String catalogName) throws CatalogException; + + /** + * Open the catalog store. Used for any required preparation in initialization phase. + * + * @throws CatalogException in case of any runtime exception + */ + void open() throws CatalogException; + + /** + * Close the catalog store when it is no longer needed and release any resource that it might be + * holding. + * + * @throws CatalogException in case of any runtime exception + */ + void close() throws CatalogException; +} diff --git a/sakura-catalog/src/main/java/org/apache/flink/table/catalog/CatalogStoreHolder.java b/sakura-catalog/src/main/java/org/apache/flink/table/catalog/CatalogStoreHolder.java new file mode 100644 index 0000000..d58d0c9 --- /dev/null +++ b/sakura-catalog/src/main/java/org/apache/flink/table/catalog/CatalogStoreHolder.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.factories.CatalogStoreFactory; + +import javax.annotation.Nullable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A holder for a {@link CatalogStore} instance and the necessary information for creating and + * initializing {@link Catalog} instances, including a {@link CatalogStoreFactory}, a {@link + * ReadableConfig} instance, and a {@link ClassLoader} instance. This class provides automatic + * resource management using the {@link AutoCloseable} interface, ensuring that the catalog-related + * resources are properly closed and released when they are no longer needed. + * + *

A {@link CatalogStoreFactory} may create multiple {@link CatalogStore} instances, which can be + * useful in SQL gateway scenarios where different sessions may use different catalog stores. + * However, in some scenarios, a single {@link CatalogStore} instance may be sufficient, in which + * case the {@link CatalogStoreFactory} can be stored in the holder to ensure that it is properly + * closed when the {@link CatalogStore} is closed. + */ +@Internal +public class CatalogStoreHolder implements AutoCloseable { + + private CatalogStore catalogStore; + + private @Nullable CatalogStoreFactory factory; + + private ReadableConfig config; + + private ClassLoader classLoader; + + private CatalogStoreHolder( + CatalogStore catalogStore, + @Nullable CatalogStoreFactory factory, + ReadableConfig config, + ClassLoader classLoader) { + this.catalogStore = catalogStore; + this.factory = factory; + this.config = config; + this.classLoader = classLoader; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** Builder for a fluent definition of a {@link CatalogStoreHolder}. */ + public static final class Builder { + + private CatalogStore catalogStore; + + private ReadableConfig config; + + private @Nullable ClassLoader classLoader; + + private @Nullable CatalogStoreFactory factory; + + public Builder catalogStore(CatalogStore catalogStore) { + this.catalogStore = catalogStore; + return this; + } + + public Builder config(ReadableConfig config) { + this.config = config; + return this; + } + + public Builder classloader(ClassLoader classLoader) { + this.classLoader = classLoader; + return this; + } + + public Builder factory(CatalogStoreFactory factory) { + this.factory = factory; + return this; + } + + public CatalogStoreHolder build() { + checkNotNull(catalogStore, "CatalogStore cannot be null"); + checkNotNull(config, "Config cannot be null"); + checkNotNull(classLoader, "Class loader cannot be null"); + return new CatalogStoreHolder(catalogStore, factory, config, classLoader); + } + } + + public CatalogStore catalogStore() { + return this.catalogStore; + } + + public ReadableConfig config() { + return this.config; + } + + public ClassLoader classLoader() { + return this.classLoader; + } + + public void open() { + this.catalogStore.open(); + } + + @Override + public void close() throws Exception { + this.catalogStore.close(); + + if (this.factory != null) { + this.factory.close(); + } + } +} diff --git a/sakura-catalog/src/main/java/org/apache/flink/table/factories/CatalogStoreFactory.java b/sakura-catalog/src/main/java/org/apache/flink/table/factories/CatalogStoreFactory.java new file mode 100644 index 0000000..be246e2 --- /dev/null +++ b/sakura-catalog/src/main/java/org/apache/flink/table/factories/CatalogStoreFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.CatalogStore; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.factories.Factory; + +import java.util.Map; + +@PublicEvolving +public interface CatalogStoreFactory extends Factory { + + CatalogStore createCatalogStore(); + + void open(Context context) throws CatalogException; + + void close() throws CatalogException; + + @PublicEvolving + interface Context { + + Map getOptions(); + + ReadableConfig getConfiguration(); + + ClassLoader getClassLoader(); + } +} diff --git a/sakura-catalog/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/sakura-catalog/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index f8ebab6..7fec56c 100644 --- a/sakura-catalog/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/sakura-catalog/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -13,4 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -cn.sliew.sakura.catalog.SakuraCatalogFactory \ No newline at end of file +cn.sliew.sakura.catalog.SakuraCatalogFactory +cn.sliew.sakura.catalog.store.JdbcCatalogStoreFactory \ No newline at end of file diff --git a/sakura-dao/src/main/java/cn/sliew/sakura/dao/entity/CatalogStore.java b/sakura-dao/src/main/java/cn/sliew/sakura/dao/entity/CatalogStore.java new file mode 100644 index 0000000..d0d41bc --- /dev/null +++ b/sakura-dao/src/main/java/cn/sliew/sakura/dao/entity/CatalogStore.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.sakura.dao.entity; + +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +@Data +@TableName("catalog_store") +public class CatalogStore extends BaseDO { + + private static final long serialVersionUID = 1L; + + @TableField("`catalog_name`") + private String catalogName; + + @TableField("configuration") + private String configuration; +} diff --git a/sakura-dao/src/main/java/cn/sliew/sakura/dao/mapper/CatalogStoreMapper.java b/sakura-dao/src/main/java/cn/sliew/sakura/dao/mapper/CatalogStoreMapper.java new file mode 100644 index 0000000..a5d39ac --- /dev/null +++ b/sakura-dao/src/main/java/cn/sliew/sakura/dao/mapper/CatalogStoreMapper.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.sakura.dao.mapper; + +import cn.sliew.sakura.dao.entity.CatalogStore; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import org.apache.ibatis.annotations.Mapper; + +@Mapper +public interface CatalogStoreMapper extends BaseMapper { + +} diff --git a/sakura-dao/src/main/java/cn/sliew/sakura/dao/util/MybatisUtil.java b/sakura-dao/src/main/java/cn/sliew/sakura/dao/util/MybatisUtil.java index a834b93..56a3635 100644 --- a/sakura-dao/src/main/java/cn/sliew/sakura/dao/util/MybatisUtil.java +++ b/sakura-dao/src/main/java/cn/sliew/sakura/dao/util/MybatisUtil.java @@ -101,6 +101,7 @@ private static SqlSessionFactory createSqlSessionFactory(DataSource dataSource) private static List getMapperXmls() { return Arrays.asList( + "cn/sliew/sakura/dao/mapper/CatalogStoreMapper.xml", "cn/sliew/sakura/dao/mapper/CatalogDatabaseMapper.xml", "cn/sliew/sakura/dao/mapper/CatalogFunctionMapper.xml", "cn/sliew/sakura/dao/mapper/CatalogTableMapper.xml" diff --git a/sakura-dao/src/main/resources/cn/sliew/sakura/dao/mapper/CatalogStoreMapper.xml b/sakura-dao/src/main/resources/cn/sliew/sakura/dao/mapper/CatalogStoreMapper.xml new file mode 100644 index 0000000..edd8fae --- /dev/null +++ b/sakura-dao/src/main/resources/cn/sliew/sakura/dao/mapper/CatalogStoreMapper.xml @@ -0,0 +1,35 @@ + + + + + + + + + + + + + + + + + id, create_time, update_time, + `catalog_name`, configuration + + diff --git a/tools/docker/mysql/init.d/sakura-mysq.sql b/tools/docker/mysql/init.d/sakura-mysq.sql index 58868c9..4b7738a 100644 --- a/tools/docker/mysql/init.d/sakura-mysq.sql +++ b/tools/docker/mysql/init.d/sakura-mysq.sql @@ -1,6 +1,18 @@ create database if not exists sakura default character set utf8mb4 collate utf8mb4_unicode_ci; use sakura; +DROP TABLE IF EXISTS `catalog_store`; +CREATE TABLE `catalog_store` +( + id BIGINT NOT NULL AUTO_INCREMENT, + catalog_name VARCHAR(256) NOT NULL, + configuration TEXT, + create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (id), + UNIQUE KEY uniq_catalog (catalog_name) +) ENGINE = InnoDB COMMENT ='catalog'; + DROP TABLE IF EXISTS `catalog_database`; CREATE TABLE `catalog_database` ( @@ -20,7 +32,7 @@ CREATE TABLE `catalog_table` ( id BIGINT NOT NULL AUTO_INCREMENT, database_id BIGINT NOT NULL, - kind VARCHAR(32) NOT NULL, + kind VARCHAR(32) NOT NULL, `name` VARCHAR(256) NOT NULL, properties TEXT, `schema` TEXT,