Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/kalencaya/sakura
Browse files Browse the repository at this point in the history
  • Loading branch information
kalencaya committed Oct 8, 2023
2 parents 3bae683 + 10b8e75 commit 2dc2510
Show file tree
Hide file tree
Showing 18 changed files with 946 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.sakura.catalog.service;

import cn.sliew.sakura.catalog.service.dto.CatalogStoreDTO;

import java.util.List;
import java.util.Optional;

public interface CatalogStoreService {

List<CatalogStoreDTO> list();

Optional<CatalogStoreDTO> get(String catalogName);

void insert(CatalogStoreDTO dto);

void delete(String catalogName);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.sakura.catalog.service.convert;

import cn.sliew.sakura.catalog.service.dto.CatalogStoreDTO;
import cn.sliew.sakura.common.exception.Rethrower;
import cn.sliew.sakura.common.util.CodecUtil;
import cn.sliew.sakura.common.util.JacksonUtil;
import cn.sliew.sakura.dao.entity.CatalogStore;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;

import java.util.Map;

public enum CatalogStoreConvert implements BaseConvert<CatalogStore, CatalogStoreDTO> {
INSTANCE;

@Override
public CatalogStore toDo(CatalogStoreDTO dto) {
try {
CatalogStore entity = new CatalogStore();
Util.copyProperties(dto, entity);
entity.setCatalogName(dto.getCatalogName());
if (dto.getConfiguration() != null) {
entity.setConfiguration(CodecUtil.decrypt(JacksonUtil.toJsonString(dto.getConfiguration())));
}
return entity;
} catch (Exception e) {
Rethrower.throwAs(e);
return null;
}
}

@Override
public CatalogStoreDTO toDto(CatalogStore entity) {
try {
CatalogStoreDTO dto = new CatalogStoreDTO();
Util.copyProperties(entity, dto);
dto.setCatalogName(entity.getCatalogName());
if (entity != null && StringUtils.isNotBlank(entity.getConfiguration())) {
Map<String, String> configuration = JacksonUtil.parseJsonString(CodecUtil.decrypt(entity.getConfiguration()), new TypeReference<Map<String, String>>() {
});
dto.setConfiguration(Configuration.fromMap(configuration));
}
return dto;
} catch (Exception e) {
Rethrower.throwAs(e);
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.sakura.catalog.service.dto;

import lombok.Data;
import org.apache.flink.configuration.Configuration;

@Data
public class CatalogStoreDTO extends BaseDTO {

private String catalogName;
private Configuration configuration;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.sakura.catalog.service.impl;

import cn.sliew.sakura.catalog.service.CatalogStoreService;
import cn.sliew.sakura.catalog.service.convert.CatalogStoreConvert;
import cn.sliew.sakura.catalog.service.dto.CatalogStoreDTO;
import cn.sliew.sakura.dao.entity.CatalogStore;
import cn.sliew.sakura.dao.mapper.CatalogStoreMapper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;

import java.util.List;
import java.util.Optional;

public class CatalogStoreServiceImpl implements CatalogStoreService {

private final SqlSessionFactory sqlSessionFactory;

public CatalogStoreServiceImpl(SqlSessionFactory sqlSessionFactory) {
this.sqlSessionFactory = sqlSessionFactory;
}

@Override
public List<CatalogStoreDTO> list() {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogStoreMapper catalogStoreMapper = sqlSession.getMapper(CatalogStoreMapper.class);
LambdaQueryWrapper<CatalogStore> queryWrapper = Wrappers.lambdaQuery(CatalogStore.class)
.orderByAsc(CatalogStore::getCatalogName);
List<CatalogStore> catalogs = catalogStoreMapper.selectList(queryWrapper);
return CatalogStoreConvert.INSTANCE.toDto(catalogs);
}
}

@Override
public Optional<CatalogStoreDTO> get(String catalogName) {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogStoreMapper catalogStoreMapper = sqlSession.getMapper(CatalogStoreMapper.class);
LambdaQueryWrapper<CatalogStore> queryWrapper = Wrappers.lambdaQuery(CatalogStore.class)
.eq(CatalogStore::getCatalogName, catalogName);
CatalogStore catalog = catalogStoreMapper.selectOne(queryWrapper);
return Optional.ofNullable(catalog).map(CatalogStoreConvert.INSTANCE::toDto);
}
}

@Override
public void insert(CatalogStoreDTO dto) {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogStoreMapper catalogStoreMapper = sqlSession.getMapper(CatalogStoreMapper.class);
CatalogStore record = CatalogStoreConvert.INSTANCE.toDo(dto);
catalogStoreMapper.insert(record);
sqlSession.commit();
}
}

@Override
public void delete(String catalogName) {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
CatalogStoreMapper catalogStoreMapper = sqlSession.getMapper(CatalogStoreMapper.class);
LambdaUpdateWrapper<CatalogStore> updateWrapper = Wrappers.lambdaUpdate(CatalogStore.class)
.eq(CatalogStore::getCatalogName, catalogName);
catalogStoreMapper.delete(updateWrapper);
sqlSession.commit();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.sakura.catalog.store;

import cn.sliew.sakura.catalog.service.CatalogStoreService;
import cn.sliew.sakura.catalog.service.dto.CatalogStoreDTO;
import cn.sliew.sakura.catalog.service.impl.CatalogStoreServiceImpl;
import org.apache.flink.table.catalog.AbstractCatalogStore;
import org.apache.flink.table.catalog.CatalogDescriptor;
import cn.sliew.sakura.dao.util.MybatisUtil;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.ibatis.session.SqlSessionFactory;

import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

public class JdbcCatalogStore extends AbstractCatalogStore {

private final String driver;
private final String jdbcUrl;
private final String username;
private final String password;
private HikariDataSource dataSource;
private CatalogStoreService catalogStoreService;

public JdbcCatalogStore(String driver, String jdbcUrl, String username, String password) {
this.driver = driver;
this.jdbcUrl = jdbcUrl;
this.username = username;
this.password = password;
}

@Override
public void open() {
super.open();
this.dataSource = MybatisUtil.createDataSource(driver, jdbcUrl, username, password);
SqlSessionFactory sqlSessionFactory = MybatisUtil.getSqlSessionFactory(dataSource);
this.catalogStoreService = new CatalogStoreServiceImpl(sqlSessionFactory);
}

@Override
public void close() {
super.close();
if (dataSource != null && dataSource.isClosed() == false) {
dataSource.close();
}
}

@Override
public void storeCatalog(String catalogName, CatalogDescriptor catalog) throws CatalogException {
if (contains(catalogName)) {
throw new CatalogException(String.format("Catalog %s's store is already exist.", catalogName));
}
CatalogStoreDTO dto = new CatalogStoreDTO();
dto.setCatalogName(catalog.getCatalogName());
dto.setConfiguration(catalog.getConfiguration());
catalogStoreService.insert(dto);
}

@Override
public void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws CatalogException {
if (contains(catalogName) == false && ignoreIfNotExists == false) {
throw new CatalogException(String.format("Catalog %s's store is not exist", catalogName));
}
catalogStoreService.delete(catalogName);
}

@Override
public Optional<CatalogDescriptor> getCatalog(String catalogName) throws CatalogException {
return catalogStoreService.get(catalogName).map(dto -> CatalogDescriptor.of(dto.getCatalogName(), dto.getConfiguration()));
}

@Override
public Set<String> listCatalogs() throws CatalogException {
return catalogStoreService.list().stream().map(CatalogStoreDTO::getCatalogName).collect(Collectors.toSet());
}

@Override
public boolean contains(String catalogName) throws CatalogException {
return getCatalog(catalogName).isPresent();
}
}
Loading

0 comments on commit 2dc2510

Please sign in to comment.