diff --git a/application-oauth2.properties b/application-oauth2.properties index 38735396..02d10310 100644 --- a/application-oauth2.properties +++ b/application-oauth2.properties @@ -9,7 +9,7 @@ spring.security.oauth2.resourceserver.jwt.issuer-uri=${spring.security.oauth2.cl spring.security.oauth2.resourceserver.jwt.jwk-set-uri=${spring.security.oauth2.resourceserver.jwt.issuer-uri}/protocol/openid-connect/certs # This is the claim in the JWT Access Token to use as username. -galapagos.security.jwt-username-claim=preferred_username +galapagos.security.jwt-user-name-claim=preferred_username # This JWT claim is used as the "display name" (usually the full name) of a user. galapagos.security.jwt-display-name-claim=name diff --git a/docs/Keycloak.md b/docs/Keycloak.md deleted file mode 100644 index ad1080a7..00000000 --- a/docs/Keycloak.md +++ /dev/null @@ -1,35 +0,0 @@ -# Keycloak Configuration for Galapagos - -Galapagos currently **requires** a Keycloak as authentication & authorization software. The Keycloak configuration can -be specified in a separate file. This file must then be configured in the property `keycloak.configurationFile`, e.g. - -``` -keycloak.configurationFile=file:./keycloak-local.json -``` - -The Keycloak configuration is -documented [at Github](https://github.com/keycloak/keycloak-documentation/blob/master/securing_apps/topics/oidc/java/java-adapter-config.adoc) -. - -The auto-generated Keycloak config file by the devsetup scripts looks similar to this: - -```json -{ - "auth-server-url": "http://192.168.56.101:32101/auth", - "realm": "galapagos", - "resource": "galapagos-webapp-dev", - "public-client": true, - "use-resource-role-mappings": true, - "principal-attribute": "preferred_username" -} -``` - -If you choose to use another Keycloak instance, you will have to adjust this configuration accordingly. Please note the -following **fix** requirements imposed by Galapagos: - -* Each administrator user **must** have a Role named `admin` assigned -* Each standard user (including administrators) **must** have a Role named `user` assigned - -You can e.g. use Keycloak's integrated identity provider and mapping functions to connect your company's Active -Directory and map users of one AD group to the `admin` role, and users of another AD group to the `user` role (these -roles will have to be defined in your realm if you do setup your own Realm). diff --git a/pom.xml b/pom.xml index 83b886f3..a3dd85e5 100644 --- a/pom.xml +++ b/pom.xml @@ -18,7 +18,7 @@ 17 1.18.28 - 2.7.2 + 3.4.1 v16.14.0 3.7.0.1746 src/main/java @@ -343,7 +343,7 @@ com.google.cloud.tools jib-maven-plugin - 2.7.0 + 3.4.0 diff --git a/src/main/java/com/hermesworld/ais/galapagos/adminjobs/impl/NoUpdatesAdminClient.java b/src/main/java/com/hermesworld/ais/galapagos/adminjobs/impl/NoUpdatesAdminClient.java index bc8f93f3..c06cfa4f 100644 --- a/src/main/java/com/hermesworld/ais/galapagos/adminjobs/impl/NoUpdatesAdminClient.java +++ b/src/main/java/com/hermesworld/ais/galapagos/adminjobs/impl/NoUpdatesAdminClient.java @@ -1,469 +1,73 @@ package com.hermesworld.ais.galapagos.adminjobs.impl; -import org.apache.kafka.clients.admin.*; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.*; +import com.hermesworld.ais.galapagos.kafka.KafkaClusterAdminClient; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.config.ConfigResource; -import org.apache.kafka.common.quota.ClientQuotaAlteration; -import org.apache.kafka.common.quota.ClientQuotaFilter; -import java.time.Duration; -import java.util.*; -import java.util.concurrent.TimeUnit; +import java.util.Collection; +import java.util.Map; /** * Helper class used by admin jobs providing a "dry run". This class wraps an existing Kafka AdminClient and throws an * UnsupportedOperationException whenever a method is called which would modify something in the Kafka * cluster. Admin jobs subclass this class and override the calls they are interested in. */ -@SuppressWarnings("deprecation") -public abstract class NoUpdatesAdminClient extends AdminClient { +public abstract class NoUpdatesAdminClient implements KafkaClusterAdminClient { - private final AdminClient delegate; + private final KafkaClusterAdminClient delegate; - public NoUpdatesAdminClient(AdminClient delegate) { + public NoUpdatesAdminClient(KafkaClusterAdminClient delegate) { this.delegate = delegate; } @Override - public void close() { - delegate.close(); - } - - @Override - @Deprecated - public void close(long duration, TimeUnit unit) { - delegate.close(duration, unit); - } - - @Override - public void close(Duration timeout) { - delegate.close(timeout); - } - - @Override - public CreateTopicsResult createTopics(Collection newTopics) { - throw new UnsupportedOperationException(); - } - - @Override - public CreateTopicsResult createTopics(Collection newTopics, CreateTopicsOptions options) { - throw new UnsupportedOperationException(); - } - - @Override - public DeleteTopicsResult deleteTopics(Collection topics) { + public KafkaFuture createTopic(NewTopic topic) { throw new UnsupportedOperationException(); } @Override - public DeleteTopicsResult deleteTopics(Collection topics, DeleteTopicsOptions options) { + public KafkaFuture deleteTopic(String topicName) { throw new UnsupportedOperationException(); } @Override - public ListTopicsResult listTopics() { - return delegate.listTopics(); - } - - @Override - public ListTopicsResult listTopics(ListTopicsOptions options) { - return delegate.listTopics(options); - } - - @Override - public DescribeTopicsResult describeTopics(Collection topicNames) { - return delegate.describeTopics(topicNames); - } - - @Override - public DescribeTopicsResult describeTopics(Collection topicNames, DescribeTopicsOptions options) { - return delegate.describeTopics(topicNames, options); + public KafkaFuture describeTopic(String topicName) { + return delegate.describeTopic(topicName); } @Override - public DescribeClusterResult describeCluster() { + public KafkaFuture> describeCluster() { return delegate.describeCluster(); } @Override - public DescribeClusterResult describeCluster(DescribeClusterOptions options) { - return delegate.describeCluster(options); - } - - @Override - public DescribeAclsResult describeAcls(AclBindingFilter filter) { + public KafkaFuture> describeAcls(AclBindingFilter filter) { return delegate.describeAcls(filter); } @Override - public DescribeAclsResult describeAcls(AclBindingFilter filter, DescribeAclsOptions options) { - return delegate.describeAcls(filter, options); - } - - @Override - public CreateAclsResult createAcls(Collection acls) { - throw new UnsupportedOperationException(); - } - - @Override - public CreateAclsResult createAcls(Collection acls, CreateAclsOptions options) { - throw new UnsupportedOperationException(); - } - - @Override - public DeleteAclsResult deleteAcls(Collection filters) { - throw new UnsupportedOperationException(); - } - - @Override - public DeleteAclsResult deleteAcls(Collection filters, DeleteAclsOptions options) { - throw new UnsupportedOperationException(); - } - - @Override - public DescribeConfigsResult describeConfigs(Collection resources) { - return delegate.describeConfigs(resources); - } - - @Override - public DescribeConfigsResult describeConfigs(Collection resources, DescribeConfigsOptions options) { - return delegate.describeConfigs(resources, options); - } - - @Override - @Deprecated - public AlterConfigsResult alterConfigs(Map configs) { - throw new UnsupportedOperationException(); - } - - @Override - @Deprecated - public AlterConfigsResult alterConfigs(Map configs, AlterConfigsOptions options) { - throw new UnsupportedOperationException(); - } - - @Override - public AlterConfigsResult incrementalAlterConfigs(Map> configs) { - throw new UnsupportedOperationException(); - } - - @Override - public AlterConfigsResult incrementalAlterConfigs(Map> configs, - AlterConfigsOptions options) { - throw new UnsupportedOperationException(); - } - - @Override - public AlterReplicaLogDirsResult alterReplicaLogDirs(Map replicaAssignment) { - throw new UnsupportedOperationException(); - } - - @Override - public AlterReplicaLogDirsResult alterReplicaLogDirs(Map replicaAssignment, - AlterReplicaLogDirsOptions options) { - throw new UnsupportedOperationException(); - } - - @Override - public DescribeLogDirsResult describeLogDirs(Collection brokers) { - return delegate.describeLogDirs(brokers); - } - - @Override - public DescribeLogDirsResult describeLogDirs(Collection brokers, DescribeLogDirsOptions options) { - return delegate.describeLogDirs(brokers, options); - } - - @Override - public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection replicas) { - return delegate.describeReplicaLogDirs(replicas); - } - - @Override - public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection replicas, - DescribeReplicaLogDirsOptions options) { - return delegate.describeReplicaLogDirs(replicas, options); - } - - @Override - public CreatePartitionsResult createPartitions(Map newPartitions) { - throw new UnsupportedOperationException(); - } - - @Override - public CreatePartitionsResult createPartitions(Map newPartitions, - CreatePartitionsOptions options) { - throw new UnsupportedOperationException(); - } - - @Override - public DeleteRecordsResult deleteRecords(Map recordsToDelete) { - throw new UnsupportedOperationException(); - } - - @Override - public DeleteRecordsResult deleteRecords(Map recordsToDelete, - DeleteRecordsOptions options) { - throw new UnsupportedOperationException(); - } - - @Override - public CreateDelegationTokenResult createDelegationToken() { - throw new UnsupportedOperationException(); - } - - @Override - public CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options) { - throw new UnsupportedOperationException(); - } - - @Override - public RenewDelegationTokenResult renewDelegationToken(byte[] hmac) { - throw new UnsupportedOperationException(); - } - - @Override - public RenewDelegationTokenResult renewDelegationToken(byte[] hmac, RenewDelegationTokenOptions options) { - throw new UnsupportedOperationException(); - } - - @Override - public ExpireDelegationTokenResult expireDelegationToken(byte[] hmac) { - throw new UnsupportedOperationException(); - } - - @Override - public ExpireDelegationTokenResult expireDelegationToken(byte[] hmac, ExpireDelegationTokenOptions options) { - throw new UnsupportedOperationException(); - } - - @Override - public DescribeDelegationTokenResult describeDelegationToken() { - return delegate.describeDelegationToken(); - } - - @Override - public DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions options) { - return delegate.describeDelegationToken(options); - } - - @Override - public DescribeConsumerGroupsResult describeConsumerGroups(Collection groupIds, - DescribeConsumerGroupsOptions options) { - return delegate.describeConsumerGroups(groupIds, options); - } - - @Override - public DescribeConsumerGroupsResult describeConsumerGroups(Collection groupIds) { - return delegate.describeConsumerGroups(groupIds); - } - - @Override - public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) { - return delegate.listConsumerGroups(options); - } - - @Override - public ListConsumerGroupsResult listConsumerGroups() { - return delegate.listConsumerGroups(); - } - - @Override - public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, - ListConsumerGroupOffsetsOptions options) { - return delegate.listConsumerGroupOffsets(groupId, options); - } - - @Override - public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) { - return delegate.listConsumerGroupOffsets(groupId); - } - - @Override - public DeleteConsumerGroupsResult deleteConsumerGroups(Collection groupIds, - DeleteConsumerGroupsOptions options) { - throw new UnsupportedOperationException(); - } - - @Override - public DeleteConsumerGroupsResult deleteConsumerGroups(Collection groupIds) { - throw new UnsupportedOperationException(); - } - - @Override - public DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(String groupId, Set partitions, - DeleteConsumerGroupOffsetsOptions options) { - throw new UnsupportedOperationException(); - } - - @Override - public DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(String groupId, Set partitions) { - throw new UnsupportedOperationException(); - } - - @Override - @Deprecated - public ElectPreferredLeadersResult electPreferredLeaders(Collection partitions) { - throw new UnsupportedOperationException(); - } - - @Override - @Deprecated - public ElectPreferredLeadersResult electPreferredLeaders(Collection partitions, - ElectPreferredLeadersOptions options) { - throw new UnsupportedOperationException(); - } - - @Override - public ElectLeadersResult electLeaders(ElectionType electionType, Set partitions) { - throw new UnsupportedOperationException(); - } - - @Override - public ElectLeadersResult electLeaders(ElectionType electionType, Set partitions, - ElectLeadersOptions options) { - throw new UnsupportedOperationException(); - } - - @Override - public AlterPartitionReassignmentsResult alterPartitionReassignments( - Map> reassignments) { - throw new UnsupportedOperationException(); - } - - @Override - public AlterPartitionReassignmentsResult alterPartitionReassignments( - Map> reassignments, - AlterPartitionReassignmentsOptions options) { - throw new UnsupportedOperationException(); - } - - @Override - public ListPartitionReassignmentsResult listPartitionReassignments() { - return delegate.listPartitionReassignments(); - } - - @Override - public ListPartitionReassignmentsResult listPartitionReassignments(Set partitions) { - return delegate.listPartitionReassignments(partitions); - } - - @Override - public ListPartitionReassignmentsResult listPartitionReassignments(Set partitions, - ListPartitionReassignmentsOptions options) { - return delegate.listPartitionReassignments(partitions, options); - } - - @Override - public ListPartitionReassignmentsResult listPartitionReassignments(ListPartitionReassignmentsOptions options) { - return delegate.listPartitionReassignments(options); - } - - @Override - public ListPartitionReassignmentsResult listPartitionReassignments(Optional> partitions, - ListPartitionReassignmentsOptions options) { - return delegate.listPartitionReassignments(partitions, options); - } - - @Override - public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(String groupId, - RemoveMembersFromConsumerGroupOptions options) { - throw new UnsupportedOperationException(); - } - - @Override - public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId, - Map offsets) { - throw new UnsupportedOperationException(); - } - - @Override - public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId, - Map offsets, AlterConsumerGroupOffsetsOptions options) { - throw new UnsupportedOperationException(); - } - - @Override - public ListOffsetsResult listOffsets(Map topicPartitionOffsets) { - return delegate.listOffsets(topicPartitionOffsets); - } - - @Override - public ListOffsetsResult listOffsets(Map topicPartitionOffsets, - ListOffsetsOptions options) { - return delegate.listOffsets(topicPartitionOffsets, options); - } - - @Override - public DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter) { - return delegate.describeClientQuotas(filter); - } - - @Override - public DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter clientQuotaFilter, - DescribeClientQuotasOptions describeClientQuotasOptions) { - return delegate.describeClientQuotas(clientQuotaFilter, describeClientQuotasOptions); - } - - @Override - public AlterClientQuotasResult alterClientQuotas(Collection entries) { - throw new UnsupportedOperationException(); - } - - @Override - public AlterClientQuotasResult alterClientQuotas(Collection collection, - AlterClientQuotasOptions alterClientQuotasOptions) { - throw new UnsupportedOperationException(); - } - - @Override - public DescribeUserScramCredentialsResult describeUserScramCredentials() { - return delegate.describeUserScramCredentials(); - } - - @Override - public DescribeUserScramCredentialsResult describeUserScramCredentials(List users) { - return delegate.describeUserScramCredentials(users); - } - - @Override - public DescribeUserScramCredentialsResult describeUserScramCredentials(List list, - DescribeUserScramCredentialsOptions describeUserScramCredentialsOptions) { - return delegate.describeUserScramCredentials(list, describeUserScramCredentialsOptions); - } - - @Override - public AlterUserScramCredentialsResult alterUserScramCredentials(List alterations) { + public KafkaFuture createAcls(Collection acls) { throw new UnsupportedOperationException(); } @Override - public AlterUserScramCredentialsResult alterUserScramCredentials(List list, - AlterUserScramCredentialsOptions alterUserScramCredentialsOptions) { + public KafkaFuture> deleteAcls(Collection filters) { throw new UnsupportedOperationException(); } @Override - public DescribeFeaturesResult describeFeatures() { - return delegate.describeFeatures(); - } - - @Override - public DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions describeFeaturesOptions) { - return delegate.describeFeatures(describeFeaturesOptions); + public KafkaFuture describeConfigs(ConfigResource resource) { + return delegate.describeConfigs(resource); } @Override - public UpdateFeaturesResult updateFeatures(Map map, - UpdateFeaturesOptions updateFeaturesOptions) { + public KafkaFuture incrementalAlterConfigs(ConfigResource resource, Map configValues) { throw new UnsupportedOperationException(); } - - @Override - public Map metrics() { - return delegate.metrics(); - } } diff --git a/src/main/java/com/hermesworld/ais/galapagos/adminjobs/impl/UpdateApplicationAclsJob.java b/src/main/java/com/hermesworld/ais/galapagos/adminjobs/impl/UpdateApplicationAclsJob.java index 14cd8692..c7245e2c 100644 --- a/src/main/java/com/hermesworld/ais/galapagos/adminjobs/impl/UpdateApplicationAclsJob.java +++ b/src/main/java/com/hermesworld/ais/galapagos/adminjobs/impl/UpdateApplicationAclsJob.java @@ -9,8 +9,7 @@ import com.hermesworld.ais.galapagos.kafka.impl.ConnectedKafkaCluster; import com.hermesworld.ais.galapagos.kafka.util.AclSupport; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.admin.CreateAclsResult; -import org.apache.kafka.clients.admin.DeleteAclsResult; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclBindingFilter; import org.json.JSONException; @@ -83,13 +82,13 @@ void performUpdate(KafkaCluster cluster, Function new NoUpdatesAdminClient(client) { @Override - public CreateAclsResult createAcls(Collection acls) { + public KafkaFuture createAcls(Collection acls) { dryRunCreatedAcls.addAll(acls); return client.createAcls(List.of()); } @Override - public DeleteAclsResult deleteAcls(Collection filters) { + public KafkaFuture> deleteAcls(Collection filters) { dryRunDeletedAcls.addAll(filters); return client.deleteAcls(List.of()); } diff --git a/src/main/java/com/hermesworld/ais/galapagos/applications/controller/ApplicationsController.java b/src/main/java/com/hermesworld/ais/galapagos/applications/controller/ApplicationsController.java index 23956fa8..d83265ad 100644 --- a/src/main/java/com/hermesworld/ais/galapagos/applications/controller/ApplicationsController.java +++ b/src/main/java/com/hermesworld/ais/galapagos/applications/controller/ApplicationsController.java @@ -71,7 +71,7 @@ public List getUserApplicationOwnerRequests() { @PutMapping(value = "/api/me/requests", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) public ApplicationOwnerRequest submitApplicationOwnerRequest( @RequestBody ApplicationOwnerRequestSubmissionDto request) { - if (StringUtils.isEmpty(request.getApplicationId())) { + if (!StringUtils.hasLength(request.getApplicationId())) { throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Required parameter applicationId is missing or empty"); } diff --git a/src/main/java/com/hermesworld/ais/galapagos/applications/impl/ApplicationsServiceImpl.java b/src/main/java/com/hermesworld/ais/galapagos/applications/impl/ApplicationsServiceImpl.java index 2bcda47a..c1102ee1 100644 --- a/src/main/java/com/hermesworld/ais/galapagos/applications/impl/ApplicationsServiceImpl.java +++ b/src/main/java/com/hermesworld/ais/galapagos/applications/impl/ApplicationsServiceImpl.java @@ -268,7 +268,7 @@ public CompletableFuture registerApplicationOnEnvironment(S GalapagosEventSink eventSink = eventManager.newEventSink(kafkaCluster); - JSONObject oldAuthentication = existing != null && !StringUtils.isEmpty(existing.getAuthenticationJson()) + JSONObject oldAuthentication = existing != null && StringUtils.hasLength(existing.getAuthenticationJson()) ? new JSONObject(existing.getAuthenticationJson()) : new JSONObject(); diff --git a/src/main/java/com/hermesworld/ais/galapagos/ccloud/apiclient/ConfluentCloudApiClient.java b/src/main/java/com/hermesworld/ais/galapagos/ccloud/apiclient/ConfluentCloudApiClient.java index 9557c66c..de1c26fc 100644 --- a/src/main/java/com/hermesworld/ais/galapagos/ccloud/apiclient/ConfluentCloudApiClient.java +++ b/src/main/java/com/hermesworld/ais/galapagos/ccloud/apiclient/ConfluentCloudApiClient.java @@ -289,10 +289,10 @@ private Function> errorResponseHandler // then fallback to simple exception } return new ConfluentApiException( - errorMessage + ": Server returned " + response.rawStatusCode() + " for " + uri); + errorMessage + ": Server returned " + response.statusCode().value() + " for " + uri); }).defaultIfEmpty(new ConfluentApiException( - errorMessage + ": Server returned " + response.rawStatusCode() + " for " + uri)); + errorMessage + ": Server returned " + response.statusCode().value() + " for " + uri)); } // currently, we only take the first error of the array, and use its "detail" as error message. diff --git a/src/main/java/com/hermesworld/ais/galapagos/ccloud/auth/ConfluentCloudAuthConfig.java b/src/main/java/com/hermesworld/ais/galapagos/ccloud/auth/ConfluentCloudAuthConfig.java index e6e9b158..5258b846 100644 --- a/src/main/java/com/hermesworld/ais/galapagos/ccloud/auth/ConfluentCloudAuthConfig.java +++ b/src/main/java/com/hermesworld/ais/galapagos/ccloud/auth/ConfluentCloudAuthConfig.java @@ -26,8 +26,8 @@ public class ConfluentCloudAuthConfig { private Boolean serviceAccountIdCompatMode; public boolean isServiceAccountIdCompatMode() { - // currently (Sep 2022), true should be default for Confluent! - return Objects.requireNonNullElse(serviceAccountIdCompatMode, true); + // As Confluent Cloud now fully supports ResID-based ACLs, we do no longer have this to be the default + return Objects.requireNonNullElse(serviceAccountIdCompatMode, false); } } diff --git a/src/main/java/com/hermesworld/ais/galapagos/certificates/auth/CertificatesAuthenticationModule.java b/src/main/java/com/hermesworld/ais/galapagos/certificates/auth/CertificatesAuthenticationModule.java index b4c50e97..ef1aa376 100644 --- a/src/main/java/com/hermesworld/ais/galapagos/certificates/auth/CertificatesAuthenticationModule.java +++ b/src/main/java/com/hermesworld/ais/galapagos/certificates/auth/CertificatesAuthenticationModule.java @@ -52,7 +52,7 @@ public CompletableFuture init() { try { Files.createDirectories(Path.of(config.getCertificatesWorkdir())); this.caManager = new CaManagerImpl(environmentId, config); - if (StringUtils.isEmpty(config.getTruststoreFile())) { + if (!StringUtils.hasLength(config.getTruststoreFile())) { createDynamicTruststore(); } else { diff --git a/src/main/java/com/hermesworld/ais/galapagos/certificates/impl/CaManagerImpl.java b/src/main/java/com/hermesworld/ais/galapagos/certificates/impl/CaManagerImpl.java index c246a6d2..a3f6d02b 100644 --- a/src/main/java/com/hermesworld/ais/galapagos/certificates/impl/CaManagerImpl.java +++ b/src/main/java/com/hermesworld/ais/galapagos/certificates/impl/CaManagerImpl.java @@ -373,7 +373,7 @@ private static CaData buildCaData(String environmentId, CertificatesAuthenticati if (!Arrays.equals(kp.getPublic().getEncoded(), data.getCaCertificate().getPublicKey().getEncoded())) { throw new RuntimeException("The public/private key pair does not match for certificate " - + data.getCaCertificate().getSubjectDN().getName()); + + data.getCaCertificate().getSubjectX500Principal().getName()); } data.setCaPrivateKey(kp.getPrivate()); @@ -381,7 +381,7 @@ private static CaData buildCaData(String environmentId, CertificatesAuthenticati try { data.setApplicationCertificateValidity( - StringUtils.isEmpty(config.getApplicationCertificateValidity()) ? DEFAULT_CERTIFICATE_VALIDITY + !StringUtils.hasLength(config.getApplicationCertificateValidity()) ? DEFAULT_CERTIFICATE_VALIDITY : Duration.parse(config.getApplicationCertificateValidity()).toMillis()); } catch (DateTimeParseException e) { @@ -390,7 +390,7 @@ private static CaData buildCaData(String environmentId, CertificatesAuthenticati data.setApplicationCertificateValidity(DEFAULT_CERTIFICATE_VALIDITY); } try { - data.setDeveloperCertificateValidity(StringUtils.isEmpty(config.getDeveloperCertificateValidity()) ? 0 + data.setDeveloperCertificateValidity(!StringUtils.hasLength(config.getDeveloperCertificateValidity()) ? 0 : Duration.parse(config.getDeveloperCertificateValidity()).toMillis()); } catch (DateTimeParseException e) { diff --git a/src/main/java/com/hermesworld/ais/galapagos/certificates/reminders/impl/CertificateExpiryReminderRunner.java b/src/main/java/com/hermesworld/ais/galapagos/certificates/reminders/impl/CertificateExpiryReminderRunner.java index 775bf7fd..648f2309 100644 --- a/src/main/java/com/hermesworld/ais/galapagos/certificates/reminders/impl/CertificateExpiryReminderRunner.java +++ b/src/main/java/com/hermesworld/ais/galapagos/certificates/reminders/impl/CertificateExpiryReminderRunner.java @@ -102,7 +102,7 @@ private ZonedDateTime getDateOfExpiry(String applicationId, String environmentId private ZonedDateTime getDateOfExpiry(ApplicationMetadata metadata, String environmentId) { KafkaAuthenticationModule authModule = kafkaClusters.getAuthenticationModule(environmentId).orElseThrow(); - if (StringUtils.isEmpty(metadata.getAuthenticationJson())) { + if (!StringUtils.hasLength(metadata.getAuthenticationJson())) { return null; } ZoneId tz; diff --git a/src/main/java/com/hermesworld/ais/galapagos/certificates/reminders/impl/CertificateExpiryReminderServiceImpl.java b/src/main/java/com/hermesworld/ais/galapagos/certificates/reminders/impl/CertificateExpiryReminderServiceImpl.java index a3e9239e..2b445e09 100644 --- a/src/main/java/com/hermesworld/ais/galapagos/certificates/reminders/impl/CertificateExpiryReminderServiceImpl.java +++ b/src/main/java/com/hermesworld/ais/galapagos/certificates/reminders/impl/CertificateExpiryReminderServiceImpl.java @@ -58,7 +58,7 @@ public List calculateDueCertificateReminders() { Collection sentReminders = getRepository(cluster).getObjects(); for (ApplicationMetadata app : allMetadata) { - if (StringUtils.isEmpty(app.getAuthenticationJson())) { + if (!StringUtils.hasLength(app.getAuthenticationJson())) { continue; } JSONObject authData; diff --git a/src/main/java/com/hermesworld/ais/galapagos/devauth/impl/DevUserAclListener.java b/src/main/java/com/hermesworld/ais/galapagos/devauth/impl/DevUserAclListener.java index 6e46742d..a8a42c50 100644 --- a/src/main/java/com/hermesworld/ais/galapagos/devauth/impl/DevUserAclListener.java +++ b/src/main/java/com/hermesworld/ais/galapagos/devauth/impl/DevUserAclListener.java @@ -205,7 +205,7 @@ public CompletableFuture handleTopicSchemaDeleted(TopicSchemaRemovedEvent @CheckReturnValue CompletableFuture updateAcls(KafkaCluster cluster, Set metadatas) { if (log.isDebugEnabled()) { - log.debug("Updating ACLs for {} on cluster {}", metadatas.stream().map(m -> m.getUserName()), + log.debug("Updating ACLs for {} on cluster {}", metadatas.stream().map(m -> m.getUserName()).toList(), cluster.getId()); } CompletableFuture result = CompletableFuture.completedFuture(null); diff --git a/src/main/java/com/hermesworld/ais/galapagos/kafka/KafkaClusterAdminClient.java b/src/main/java/com/hermesworld/ais/galapagos/kafka/KafkaClusterAdminClient.java new file mode 100644 index 00000000..d0b1ad90 --- /dev/null +++ b/src/main/java/com/hermesworld/ais/galapagos/kafka/KafkaClusterAdminClient.java @@ -0,0 +1,40 @@ +package com.hermesworld.ais.galapagos.kafka; + +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.config.ConfigResource; + +import java.util.Collection; +import java.util.Map; + +/** + * Galapagos Interface for abstracting the not-so-helpful Kafka Admin interface. This allows for wrapping and e.g. + * KafkaFuture encapsulation.
+ * Also, it is reduced to the relevant Kafka Admin operations for Galapagos. + */ +public interface KafkaClusterAdminClient { + + KafkaFuture> deleteAcls(Collection filters); + + KafkaFuture createAcls(Collection bindings); + + KafkaFuture> describeAcls(AclBindingFilter filter); + + KafkaFuture createTopic(NewTopic topic); + + KafkaFuture deleteTopic(String topicName); + + KafkaFuture describeConfigs(ConfigResource resource); + + KafkaFuture> describeCluster(); + + KafkaFuture incrementalAlterConfigs(ConfigResource resource, Map configValues); + + KafkaFuture describeTopic(String topicName); + +} diff --git a/src/main/java/com/hermesworld/ais/galapagos/kafka/config/KafkaEnvironmentsConfig.java b/src/main/java/com/hermesworld/ais/galapagos/kafka/config/KafkaEnvironmentsConfig.java index 07d33d58..8c09011a 100644 --- a/src/main/java/com/hermesworld/ais/galapagos/kafka/config/KafkaEnvironmentsConfig.java +++ b/src/main/java/com/hermesworld/ais/galapagos/kafka/config/KafkaEnvironmentsConfig.java @@ -36,7 +36,11 @@ public class KafkaEnvironmentsConfig { @Getter @Setter - private boolean readonly; + private boolean logAdminOperations; + + @Getter + @Setter + private Long adminClientRequestTimeout; @Getter @Setter @@ -64,7 +68,8 @@ public KafkaClusters kafkaClusters(KafkaExecutorFactory executorFactory, } return new ConnectedKafkaClusters(new ArrayList<>(environments), authModules, productionEnvironment, - metadataTopicsPrefix, executorFactory, replicationFactor); + metadataTopicsPrefix, executorFactory, replicationFactor, logAdminOperations, + adminClientRequestTimeout); } private void validateConfig() { diff --git a/src/main/java/com/hermesworld/ais/galapagos/kafka/impl/ConnectedKafkaCluster.java b/src/main/java/com/hermesworld/ais/galapagos/kafka/impl/ConnectedKafkaCluster.java index 3154f48d..7bb22301 100644 --- a/src/main/java/com/hermesworld/ais/galapagos/kafka/impl/ConnectedKafkaCluster.java +++ b/src/main/java/com/hermesworld/ais/galapagos/kafka/impl/ConnectedKafkaCluster.java @@ -1,14 +1,13 @@ package com.hermesworld.ais.galapagos.kafka.impl; -import com.hermesworld.ais.galapagos.kafka.KafkaCluster; -import com.hermesworld.ais.galapagos.kafka.KafkaUser; -import com.hermesworld.ais.galapagos.kafka.TopicConfigEntry; -import com.hermesworld.ais.galapagos.kafka.TopicCreateParams; +import com.hermesworld.ais.galapagos.kafka.*; import com.hermesworld.ais.galapagos.kafka.util.KafkaTopicConfigHelper; import com.hermesworld.ais.galapagos.kafka.util.TopicBasedRepository; import com.hermesworld.ais.galapagos.util.FutureUtil; import com.hermesworld.ais.galapagos.util.HasKey; -import org.apache.kafka.clients.admin.*; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -33,11 +32,12 @@ import java.util.function.Function; import java.util.stream.Collectors; +@Slf4j public class ConnectedKafkaCluster implements KafkaCluster { private final String environmentId; - private AdminClient adminClient; + private KafkaClusterAdminClient adminClient; private final KafkaRepositoryContainer repositoryContainer; @@ -50,7 +50,7 @@ public class ConnectedKafkaCluster implements KafkaCluster { private static final long MAX_POLL_TIME = Duration.ofSeconds(10).toMillis(); public ConnectedKafkaCluster(String environmentId, KafkaRepositoryContainer repositoryContainer, - AdminClient adminClient, KafkaConsumerFactory kafkaConsumerFactory, + KafkaClusterAdminClient adminClient, KafkaConsumerFactory kafkaConsumerFactory, KafkaFutureDecoupler futureDecoupler) { this.environmentId = environmentId; this.adminClient = adminClient; @@ -66,7 +66,7 @@ public ConnectedKafkaCluster(String environmentId, KafkaRepositoryContainer repo * @param wrapperFn Function returning a new AdminClient object which should wrap the existing AdminClient (passed * to the function). It is also valid to return the AdminClient object passed to this function. */ - public void wrapAdminClient(Function wrapperFn) { + public void wrapAdminClient(Function wrapperFn) { this.adminClient = wrapperFn.apply(this.adminClient); } @@ -90,25 +90,24 @@ public CompletableFuture updateUserAcls(KafkaUser user) { return deleteAcls.isEmpty() ? CompletableFuture.completedFuture(null) : toCompletableFuture(adminClient - .deleteAcls(deleteAcls.stream().map(acl -> acl.toFilter()).collect(Collectors.toList())) - .all()); + .deleteAcls(deleteAcls.stream().map(acl -> acl.toFilter()).collect(Collectors.toList()))); }).thenCompose(o -> createAcls.isEmpty() ? CompletableFuture.completedFuture(null) - : toCompletableFuture(adminClient.createAcls(createAcls).all())); + : toCompletableFuture(adminClient.createAcls(createAcls))); } @Override public CompletableFuture removeUserAcls(KafkaUser user) { - if (user.getKafkaUserName() == null) { + String userName = user.getKafkaUserName(); + if (userName == null) { return FutureUtil.noop(); } - return toCompletableFuture( - adminClient.deleteAcls(List.of(userAclFilter(user.getKafkaUserName(), ResourceType.ANY))).all()) - .thenApply(o -> null); + return toCompletableFuture(adminClient.deleteAcls(List.of(userAclFilter(userName, ResourceType.ANY)))) + .thenApply(o -> null); } @Override public CompletableFuture visitAcls(Function callback) { - return toCompletableFuture(adminClient.describeAcls(AclBindingFilter.ANY).values()).thenAccept(acls -> { + return toCompletableFuture(adminClient.describeAcls(AclBindingFilter.ANY)).thenAccept(acls -> { for (AclBinding acl : acls) { if (!callback.apply(acl)) { break; @@ -134,7 +133,7 @@ public CompletableFuture createTopic(String topicName, TopicCreateParams t NewTopic newTopic = new NewTopic(topicName, topicCreateParams.getNumberOfPartitions(), (short) topicCreateParams.getReplicationFactor()).configs(topicCreateParams.getTopicConfigs()); - return toCompletableFuture(this.adminClient.createTopics(Set.of(newTopic)).all()); + return toCompletableFuture(this.adminClient.createTopic(newTopic)); } @Override @@ -143,53 +142,46 @@ public CompletableFuture deleteTopic(String topicName) { new ResourcePatternFilter(ResourceType.TOPIC, topicName, PatternType.LITERAL), new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY)); - KafkaFuture deleteTopicFuture = this.adminClient.deleteTopics(Set.of(topicName)).all(); + KafkaFuture deleteTopicFuture = this.adminClient.deleteTopic(topicName); return toCompletableFuture(deleteTopicFuture) - .thenCompose(o -> toCompletableFuture(adminClient.deleteAcls(Set.of(aclFilter)).all())) - .thenApply(o -> null); + .thenCompose(o -> toCompletableFuture(adminClient.deleteAcls(Set.of(aclFilter)))).thenApply(o -> null); } @Override public CompletableFuture> getTopicConfig(String topicName) { ConfigResource cres = new ConfigResource(ConfigResource.Type.TOPIC, topicName); - return toCompletableFuture(adminClient.describeConfigs(Set.of(cres)).all()) - .thenApply(map -> map.getOrDefault(cres, new Config(Collections.emptyList())).entries().stream() - .map(entry -> new TopicConfigEntryImpl(entry)).collect(Collectors.toSet())); + return toCompletableFuture(adminClient.describeConfigs(cres)).thenApply(config -> config.entries().stream() + .map(entry -> new TopicConfigEntryImpl(entry)).collect(Collectors.toSet())); } @Override public CompletableFuture> getDefaultTopicConfig() { - return toCompletableFuture(adminClient.describeCluster().nodes()).thenCompose(nodes -> { + return toCompletableFuture(adminClient.describeCluster()).thenCompose(nodes -> { if (nodes.isEmpty()) { return CompletableFuture.failedFuture(new KafkaException("No nodes in cluster")); } - return toCompletableFuture(adminClient - .describeConfigs( - Set.of(new ConfigResource(ConfigResource.Type.BROKER, "" + nodes.iterator().next().id()))) - .all()); - }).thenApply(map -> KafkaTopicConfigHelper.getTopicDefaultValues(map.values().iterator().next())); + return toCompletableFuture(adminClient.describeConfigs( + new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(nodes.iterator().next().id())))); + }).thenApply(config -> KafkaTopicConfigHelper.getTopicDefaultValues(config)); } @Override public CompletableFuture setTopicConfig(String topicName, Map configValues) { - Config config = new Config(configValues.entrySet().stream().map(e -> new ConfigEntry(e.getKey(), e.getValue())) - .collect(Collectors.toSet())); - return toCompletableFuture(adminClient - .alterConfigs(Map.of(new ConfigResource(ConfigResource.Type.TOPIC, topicName), config)).all()); + .incrementalAlterConfigs(new ConfigResource(ConfigResource.Type.TOPIC, topicName), configValues)); } @Override public CompletableFuture getActiveBrokerCount() { - return toCompletableFuture(adminClient.describeCluster().nodes()).thenApply(nodes -> nodes.size()); + return toCompletableFuture(adminClient.describeCluster()).thenApply(nodes -> nodes.size()); } @Override public CompletableFuture buildTopicCreateParams(String topicName) { - return toCompletableFuture(adminClient.describeTopics(Set.of(topicName)).all()) - .thenCompose(map -> buildCreateTopicParams(map.get(topicName))); + return toCompletableFuture(adminClient.describeTopic(topicName)) + .thenCompose(desc -> buildCreateTopicParams(desc)); } @Override @@ -250,6 +242,19 @@ public void onPartitionsAssigned(Collection partitions) { return result; } + @Override + public CompletableFuture getKafkaServerVersion() { + Function toVersionString = s -> !s.contains("-") ? s : s.substring(0, s.indexOf('-')); + return toCompletableFuture(adminClient.describeCluster()).thenCompose(coll -> { + String nodeName = coll.iterator().next().idString(); + + return toCompletableFuture(adminClient.describeConfigs(new ConfigResource(Type.BROKER, nodeName))) + .thenApply(config -> config.get("inter.broker.protocol.version") == null ? "UNKNOWN_VERSION" + : config.get("inter.broker.protocol.version").value()) + .thenApply(toVersionString); + }); + } + private CompletableFuture buildCreateTopicParams(TopicDescription description) { return getTopicConfig(description.name()).thenApply(configs -> { TopicCreateParams params = new TopicCreateParams(description.partitions().size(), @@ -267,7 +272,7 @@ private CompletableFuture> getUserAcls(String username) { if (ObjectUtils.isEmpty(username)) { return CompletableFuture.completedFuture(List.of()); } - return toCompletableFuture(adminClient.describeAcls(userAclFilter(username, ResourceType.ANY)).values()); + return toCompletableFuture(adminClient.describeAcls(userAclFilter(username, ResourceType.ANY))); } private AclBindingFilter userAclFilter(String username, ResourceType resourceType) { @@ -281,19 +286,4 @@ private CompletableFuture toCompletableFuture(KafkaFuture kafkaFuture) return futureDecoupler.toCompletableFuture(kafkaFuture); } - @Override - public CompletableFuture getKafkaServerVersion() { - Function toVersionString = s -> !s.contains("-") ? s : s.substring(0, s.indexOf('-')); - return toCompletableFuture(adminClient.describeCluster().nodes()).thenCompose(coll -> { - String nodeName = coll.iterator().next().idString(); - - return toCompletableFuture(adminClient.describeConfigs( - Set.of(new ConfigResource(Type.BROKER, nodeName))).all()).thenApply(map -> map - .values().stream() - .map(config -> config.get("inter.broker.protocol.version") == null ? "UNKNOWN_VERSION" - : config.get("inter.broker.protocol.version").value()) - .findFirst().map(toVersionString).orElse("UNKNOWN_VERSION")); - }); - } - } diff --git a/src/main/java/com/hermesworld/ais/galapagos/kafka/impl/ConnectedKafkaClusters.java b/src/main/java/com/hermesworld/ais/galapagos/kafka/impl/ConnectedKafkaClusters.java index 64d5a29e..f3fd5ef2 100644 --- a/src/main/java/com/hermesworld/ais/galapagos/kafka/impl/ConnectedKafkaClusters.java +++ b/src/main/java/com/hermesworld/ais/galapagos/kafka/impl/ConnectedKafkaClusters.java @@ -1,10 +1,12 @@ package com.hermesworld.ais.galapagos.kafka.impl; import com.hermesworld.ais.galapagos.kafka.KafkaCluster; +import com.hermesworld.ais.galapagos.kafka.KafkaClusterAdminClient; import com.hermesworld.ais.galapagos.kafka.KafkaClusters; import com.hermesworld.ais.galapagos.kafka.KafkaExecutorFactory; import com.hermesworld.ais.galapagos.kafka.auth.KafkaAuthenticationModule; import com.hermesworld.ais.galapagos.kafka.config.KafkaEnvironmentConfig; +import com.hermesworld.ais.galapagos.kafka.util.LoggingAdminClient; import com.hermesworld.ais.galapagos.kafka.util.TopicBasedRepository; import com.hermesworld.ais.galapagos.util.HasKey; import org.springframework.util.ObjectUtils; @@ -29,22 +31,22 @@ public class ConnectedKafkaClusters implements KafkaClusters { public ConnectedKafkaClusters(List environmentMetadata, Map authenticationModules, String productionEnvironmentId, - String galapagosInternalPrefix, KafkaExecutorFactory executorFactory, - int topicRepositoryReplicationFactor) { + String galapagosInternalPrefix, KafkaExecutorFactory executorFactory, int topicRepositoryReplicationFactor, + boolean logging, Long adminClientRequestTimeout) { this.environmentMetadata = environmentMetadata; this.productionEnvironmentId = productionEnvironmentId; this.authenticationModules = authenticationModules; KafkaFutureDecoupler futureDecoupler = new KafkaFutureDecoupler(executorFactory); - this.connectionManager = new KafkaConnectionManager(environmentMetadata, authenticationModules, - futureDecoupler); + this.connectionManager = new KafkaConnectionManager(environmentMetadata, authenticationModules, futureDecoupler, + adminClientRequestTimeout); for (KafkaEnvironmentConfig envMeta : environmentMetadata) { KafkaRepositoryContainerImpl repoContainer = new KafkaRepositoryContainerImpl(connectionManager, envMeta.getId(), galapagosInternalPrefix, topicRepositoryReplicationFactor); ConnectedKafkaCluster cluster = buildConnectedKafkaCluster(envMeta.getId(), connectionManager, - repoContainer, futureDecoupler); + repoContainer, futureDecoupler, logging); clusters.put(envMeta.getId(), cluster); repoContainers.add(repoContainer); } @@ -104,10 +106,15 @@ public Optional getAuthenticationModule(String enviro private static ConnectedKafkaCluster buildConnectedKafkaCluster(String environmentId, KafkaConnectionManager connectionManager, KafkaRepositoryContainer repositoryContainer, - KafkaFutureDecoupler futureDecoupler) { - return new ConnectedKafkaCluster(environmentId, repositoryContainer, - connectionManager.getAdminClient(environmentId), connectionManager.getConsumerFactory(environmentId), - futureDecoupler); + KafkaFutureDecoupler futureDecoupler, boolean logging) { + KafkaClusterAdminClient adminClient = new DefaultKafkaClusterAdminClient( + connectionManager.getAdminClient(environmentId)); + if (logging) { + adminClient = new LoggingAdminClient(environmentId, adminClient); + } + + return new ConnectedKafkaCluster(environmentId, repositoryContainer, adminClient, + connectionManager.getConsumerFactory(environmentId), futureDecoupler); } } diff --git a/src/main/java/com/hermesworld/ais/galapagos/kafka/impl/DefaultKafkaClusterAdminClient.java b/src/main/java/com/hermesworld/ais/galapagos/kafka/impl/DefaultKafkaClusterAdminClient.java new file mode 100644 index 00000000..35b1e362 --- /dev/null +++ b/src/main/java/com/hermesworld/ais/galapagos/kafka/impl/DefaultKafkaClusterAdminClient.java @@ -0,0 +1,76 @@ +package com.hermesworld.ais.galapagos.kafka.impl; + +import com.hermesworld.ais.galapagos.kafka.KafkaClusterAdminClient; +import org.apache.kafka.clients.admin.*; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.config.ConfigResource; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class DefaultKafkaClusterAdminClient implements KafkaClusterAdminClient { + + private final Admin admin; + + public DefaultKafkaClusterAdminClient(Admin admin) { + this.admin = admin; + } + + @Override + public KafkaFuture> deleteAcls(Collection filters) { + return admin.deleteAcls(filters).all(); + } + + @Override + public KafkaFuture createAcls(Collection bindings) { + return admin.createAcls(bindings).all(); + } + + @Override + public KafkaFuture> describeAcls(AclBindingFilter filter) { + return admin.describeAcls(filter).values(); + } + + @Override + public KafkaFuture createTopic(NewTopic topic) { + return admin.createTopics(Set.of(topic)).all(); + } + + @Override + public KafkaFuture deleteTopic(String topicName) { + return admin.deleteTopics(Set.of(topicName)).all(); + } + + @Override + public KafkaFuture describeConfigs(ConfigResource resource) { + return admin.describeConfigs(Set.of(resource)).values().getOrDefault(resource, + KafkaFuture.completedFuture(new Config(Set.of()))); + } + + @Override + public KafkaFuture> describeCluster() { + return admin.describeCluster().nodes(); + } + + @Override + public KafkaFuture describeTopic(String topicName) { + return admin.describeTopics(Set.of(topicName)).topicNameValues().get(topicName); + } + + @Override + public KafkaFuture incrementalAlterConfigs(ConfigResource resource, Map configValues) { + List alterOps = configValues.entrySet().stream().map(entry -> { + if (entry.getValue() == null) { + return new AlterConfigOp(new ConfigEntry(entry.getKey(), null), AlterConfigOp.OpType.DELETE); + } + return new AlterConfigOp(new ConfigEntry(entry.getKey(), entry.getValue()), AlterConfigOp.OpType.SET); + }).toList(); + + return admin.incrementalAlterConfigs(Map.of(resource, alterOps)).all(); + } +} diff --git a/src/main/java/com/hermesworld/ais/galapagos/kafka/impl/KafkaConnectionManager.java b/src/main/java/com/hermesworld/ais/galapagos/kafka/impl/KafkaConnectionManager.java index 61d6243a..b813997f 100644 --- a/src/main/java/com/hermesworld/ais/galapagos/kafka/impl/KafkaConnectionManager.java +++ b/src/main/java/com/hermesworld/ais/galapagos/kafka/impl/KafkaConnectionManager.java @@ -7,6 +7,7 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -31,9 +32,13 @@ class KafkaConnectionManager { private final KafkaFutureDecoupler futureDecoupler; + private final Long adminClientRequestTimeout; + public KafkaConnectionManager(List environments, - Map authenticationModules, KafkaFutureDecoupler futureDecoupler) { + Map authenticationModules, KafkaFutureDecoupler futureDecoupler, + Long adminClientRequestTimeout) { this.futureDecoupler = futureDecoupler; + this.adminClientRequestTimeout = adminClientRequestTimeout; for (KafkaEnvironmentConfig env : environments) { String id = env.getId(); @@ -81,14 +86,14 @@ public KafkaConsumerFactory getConsumerFactory(String environmen private AdminClient buildAdminClient(KafkaEnvironmentConfig environment, KafkaAuthenticationModule authenticationModule) { - return buildAdminClient(environment, authenticationModule, new Properties()); - } - - private AdminClient buildAdminClient(KafkaEnvironmentConfig environment, - KafkaAuthenticationModule authenticationModule, Properties propsOverride) { Properties props = buildKafkaProperties(environment, authenticationModule); - props.putAll(propsOverride); - + if (adminClientRequestTimeout != null) { + props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(adminClientRequestTimeout)); + props.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, + String.valueOf(adminClientRequestTimeout)); + props.setProperty(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG, + String.valueOf(adminClientRequestTimeout)); + } return AdminClient.create(props); } diff --git a/src/main/java/com/hermesworld/ais/galapagos/kafka/impl/KafkaFutureDecoupler.java b/src/main/java/com/hermesworld/ais/galapagos/kafka/impl/KafkaFutureDecoupler.java index a9110d8e..8d161222 100644 --- a/src/main/java/com/hermesworld/ais/galapagos/kafka/impl/KafkaFutureDecoupler.java +++ b/src/main/java/com/hermesworld/ais/galapagos/kafka/impl/KafkaFutureDecoupler.java @@ -1,27 +1,20 @@ package com.hermesworld.ais.galapagos.kafka.impl; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.function.Consumer; - +import com.hermesworld.ais.galapagos.kafka.KafkaExecutorFactory; import org.apache.kafka.common.KafkaFuture; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.ListenableFutureCallback; -import com.hermesworld.ais.galapagos.kafka.KafkaExecutorFactory; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; /** - * Helper class which decouples the completion of {@link KafkaFuture} or {@link ListenableFuture} instances from the + * Helper class which decouples the completion of {@link KafkaFuture} or {@link CompletableFuture} instances from the * main Kafka Thread. * * @author AlbrechtFlo - * */ public class KafkaFutureDecoupler { - private KafkaExecutorFactory executorFactory; + private final KafkaExecutorFactory executorFactory; public KafkaFutureDecoupler(KafkaExecutorFactory executorFactory) { this.executorFactory = executorFactory; @@ -40,14 +33,7 @@ public KafkaFutureDecoupler(KafkaExecutorFactory executorFactory) { * completes on a Thread decoupled from the Kafka Thread. */ public CompletableFuture toCompletableFuture(KafkaFuture future) { - return toCompletableFuture(future, cb -> future.whenComplete((t, ex) -> { - if (ex != null) { - cb.onFailure(ex); - } - else { - cb.onSuccess(t); - } - })); + return decouple(kafkaFutureToCompletableFuture(future)); } /** @@ -63,75 +49,46 @@ public CompletableFuture toCompletableFuture(KafkaFuture future) { * completes on a Thread decoupled from the Kafka Thread. */ public CompletableFuture toCompletableFuture(CompletableFuture completableFuture) { - return toCompletableFuture(completableFuture, cb -> completableFuture.whenComplete((t, ex) -> { - if (ex != null) { - cb.onFailure(ex); - } - else { - cb.onSuccess(t); - } - })); + return decouple(completableFuture); } - /** - * Returns a {@link CompletableFuture} which completes when the given {@link ListenableFuture} completes. If the - * ListenableFuture is already complete, a completed Future is returned. Otherwise, the returned Future - * completes on a Thread provided by a fresh ExecutorService of the KafkaExecutorFactory - * provided for this helper class. - * - * @param Type of the value provided by the Future. - * @param future Future which may be complete, or which may complete on the Kafka Thread. - * - * @return A completable Future which may be already complete if the original Future already was complete, or which - * completes on a Thread decoupled from the Kafka Thread. - */ - public CompletableFuture toCompletableFuture(ListenableFuture future) { - return toCompletableFuture(future, cb -> future.addCallback(cb)); - } + private CompletableFuture decouple(CompletableFuture completableFuture) { + if (completableFuture.isDone()) { + return completableFuture; + } + + CompletableFuture result = new CompletableFuture<>(); + ExecutorService executor = executorFactory.newExecutor(); - private CompletableFuture toCompletableFuture(Future simpleFuture, - Consumer> callbackHookConsumer) { - if (simpleFuture.isDone()) { + completableFuture.whenComplete((res, throwable) -> { try { - return CompletableFuture.completedFuture(simpleFuture.get()); - } - catch (ExecutionException e) { - return CompletableFuture.failedFuture(e.getCause()); + executor.submit(() -> { + if (throwable != null) { + result.completeExceptionally(throwable); + } + else { + result.complete(res); + } + }); } - catch (InterruptedException e) { - return CompletableFuture.failedFuture(e); + finally { + executor.shutdown(); } - } + }); - // this assumes that the associated operation of the KafkaFuture has already been scheduled to the KafkaAdmin - // thread - CompletableFuture result = new CompletableFuture(); - ExecutorService executor = executorFactory.newExecutor(); + return result; + } - ListenableFutureCallback callback = new ListenableFutureCallback() { - @Override - public void onFailure(Throwable ex) { - try { - executor.submit(() -> result.completeExceptionally(ex)); - } - finally { - executor.shutdown(); - } + private CompletableFuture kafkaFutureToCompletableFuture(KafkaFuture future) { + CompletableFuture result = new CompletableFuture<>(); + future.whenComplete((res, t) -> { + if (t != null) { + result.completeExceptionally(t); } - - @Override - public void onSuccess(T t) { - try { - executor.submit(() -> result.complete(t)); - } - finally { - executor.shutdown(); - } + else { + result.complete(res); } - }; - - // hook the callback into the original Future. This depends on the Future implementation (Kafka or Listenable). - callbackHookConsumer.accept(callback); + }); return result; } diff --git a/src/main/java/com/hermesworld/ais/galapagos/kafka/impl/KafkaRepositoryContainerImpl.java b/src/main/java/com/hermesworld/ais/galapagos/kafka/impl/KafkaRepositoryContainerImpl.java index ae2ccd8a..4a9ede3d 100644 --- a/src/main/java/com/hermesworld/ais/galapagos/kafka/impl/KafkaRepositoryContainerImpl.java +++ b/src/main/java/com/hermesworld/ais/galapagos/kafka/impl/KafkaRepositoryContainerImpl.java @@ -117,7 +117,7 @@ private void ensureTopicExists(String topic) { Map desc; try { - desc = this.adminClient.describeTopics(Set.of(topic)).all().get(); + desc = this.adminClient.describeTopics(Set.of(topic)).allTopicNames().get(); } catch (Exception e) { desc = Collections.emptyMap(); diff --git a/src/main/java/com/hermesworld/ais/galapagos/kafka/impl/KafkaSenderImpl.java b/src/main/java/com/hermesworld/ais/galapagos/kafka/impl/KafkaSenderImpl.java index a602d2ab..c55d7724 100644 --- a/src/main/java/com/hermesworld/ais/galapagos/kafka/impl/KafkaSenderImpl.java +++ b/src/main/java/com/hermesworld/ais/galapagos/kafka/impl/KafkaSenderImpl.java @@ -1,11 +1,10 @@ package com.hermesworld.ais.galapagos.kafka.impl; -import java.util.concurrent.CompletableFuture; - +import com.hermesworld.ais.galapagos.kafka.KafkaSender; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.kafka.core.KafkaTemplate; -import com.hermesworld.ais.galapagos.kafka.KafkaSender; +import java.util.concurrent.CompletableFuture; /** * Wraps a KafkaTemplate to make concatenated Futures Thread-safe. @@ -15,9 +14,9 @@ */ public class KafkaSenderImpl implements KafkaSender { - private KafkaTemplate kafkaTemplate; + private final KafkaTemplate kafkaTemplate; - private KafkaFutureDecoupler futureDecoupler; + private final KafkaFutureDecoupler futureDecoupler; public KafkaSenderImpl(KafkaTemplate template, KafkaFutureDecoupler futureDecoupler) { this.kafkaTemplate = template; diff --git a/src/main/java/com/hermesworld/ais/galapagos/kafka/util/KafkaTopicConfigHelper.java b/src/main/java/com/hermesworld/ais/galapagos/kafka/util/KafkaTopicConfigHelper.java index 819417cb..6717fd4a 100644 --- a/src/main/java/com/hermesworld/ais/galapagos/kafka/util/KafkaTopicConfigHelper.java +++ b/src/main/java/com/hermesworld/ais/galapagos/kafka/util/KafkaTopicConfigHelper.java @@ -1,16 +1,12 @@ package com.hermesworld.ais.galapagos.kafka.util; -import java.util.Arrays; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; - import org.apache.kafka.clients.admin.Config; import org.apache.kafka.common.config.TopicConfig; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + /** * Helper class for dealing with Kafka Topic Config properties. This class contains lots of information from the Kafka * docs, e.g. default values for some properties, and property precedence (e.g. log.retention.hours vs. @@ -19,6 +15,7 @@ * @author AlbrechtFlo * */ +@SuppressWarnings("deprecation") public class KafkaTopicConfigHelper { private static final Map CONFIG_INFOS = new LinkedHashMap<>(); @@ -48,7 +45,7 @@ public class KafkaTopicConfigHelper { CONFIG_INFOS.put(TopicConfig.FLUSH_MS_CONFIG, new ConfigValueInfo(INFINITE_MS, "log.flush.interval.ms", TopicConfig.FLUSH_MS_DOC)); CONFIG_INFOS.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, - new ConfigValueInfo("4096", "log.index.interval.bytes", TopicConfig.INDEX_INTERVAL_BYTES_DOCS)); + new ConfigValueInfo("4096", "log.index.interval.bytes", TopicConfig.INDEX_INTERVAL_BYTES_DOC)); CONFIG_INFOS.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, new ConfigValueInfo("1000012", "message.max.bytes", TopicConfig.MAX_MESSAGE_BYTES_DOC)); CONFIG_INFOS.put(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, @@ -96,14 +93,14 @@ private KafkaTopicConfigHelper() { public static Map getConfigKeysAndDescription() { Map result = new LinkedHashMap<>(); - CONFIG_INFOS.entrySet().stream().forEach(e -> result.put(e.getKey(), e.getValue().getDescription())); + CONFIG_INFOS.forEach((key, value) -> result.put(key, value.description())); return result; } public static Map getTopicDefaultValues(Config brokerConfig) { Map brokerConfigValues = new HashMap<>(); // have to use forEach instead of Map collector because values could be null - brokerConfig.entries().stream().forEach(entry -> brokerConfigValues.put(entry.name(), entry.value())); + brokerConfig.entries().forEach(entry -> brokerConfigValues.put(entry.name(), entry.value())); Map result = new HashMap<>(); @@ -112,67 +109,30 @@ public static Map getTopicDefaultValues(Config brokerConfig) { ConfigValueInfo info = entry.getValue(); String serverDefault = null; - if (brokerConfigValues.containsKey(info.getServerDefaultProperty())) { - serverDefault = brokerConfigValues.get(info.getServerDefaultProperty()); + if (brokerConfigValues.containsKey(info.serverDefaultProperty())) { + serverDefault = brokerConfigValues.get(info.serverDefaultProperty()); } if (serverDefault == null && SECONDARY_SERVER_PROPS.containsKey(configKey)) { for (SecondaryServerProp prop : SECONDARY_SERVER_PROPS.get(configKey)) { - if (brokerConfigValues.get(prop.getConfigName()) != null) { - serverDefault = prop.apply(brokerConfigValues.get(prop.getConfigName())); + if (brokerConfigValues.get(prop.configName()) != null) { + serverDefault = prop.apply(brokerConfigValues.get(prop.configName())); break; } } } - result.put(configKey, serverDefault == null ? info.getDefaultValue() : serverDefault); + result.put(configKey, serverDefault == null ? info.defaultValue() : serverDefault); } return result; } - private static class ConfigValueInfo { - - private String defaultValue; - - private String serverDefaultProperty; - - private String description; - - public ConfigValueInfo(String defaultValue, String serverDefaultProperty, String description) { - this.defaultValue = defaultValue; - this.serverDefaultProperty = serverDefaultProperty; - this.description = description; - } - - public String getDefaultValue() { - return defaultValue; - } - - public String getServerDefaultProperty() { - return serverDefaultProperty; - } - - public String getDescription() { - return description; - } + private record ConfigValueInfo(String defaultValue, String serverDefaultProperty, String description) { } - private static class SecondaryServerProp { - - private String configName; - - private Function mappingFunction; - - public SecondaryServerProp(String configName, Function mappingFunction) { - this.configName = configName; - this.mappingFunction = mappingFunction; - } - - public String getConfigName() { - return configName; - } + private record SecondaryServerProp(String configName, Function mappingFunction) { public String apply(String baseConfigValue) { return mappingFunction.apply(baseConfigValue); diff --git a/src/main/java/com/hermesworld/ais/galapagos/kafka/util/LoggingAdminClient.java b/src/main/java/com/hermesworld/ais/galapagos/kafka/util/LoggingAdminClient.java new file mode 100644 index 00000000..f2e63899 --- /dev/null +++ b/src/main/java/com/hermesworld/ais/galapagos/kafka/util/LoggingAdminClient.java @@ -0,0 +1,96 @@ +package com.hermesworld.ais.galapagos.kafka.util; + +import com.hermesworld.ais.galapagos.kafka.KafkaClusterAdminClient; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.config.ConfigResource; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; + +@Slf4j +public class LoggingAdminClient implements KafkaClusterAdminClient { + + private final String clusterId; + + private final KafkaClusterAdminClient delegate; + + public LoggingAdminClient(String clusterId, KafkaClusterAdminClient delegate) { + if (delegate instanceof LoggingAdminClient) { + throw new IllegalArgumentException("Cannot create a logging Admin Client on a logging Admin Client"); + } + this.clusterId = clusterId; + this.delegate = delegate; + } + + @Override + public KafkaFuture> deleteAcls(Collection filters) { + return logOperation("deleteAcls", filters, () -> delegate.deleteAcls(filters)); + } + + @Override + public KafkaFuture createAcls(Collection bindings) { + return logOperation("createAcls", bindings, () -> delegate.createAcls(bindings)); + } + + @Override + public KafkaFuture> describeAcls(AclBindingFilter filter) { + return logOperation("describeAcls", filter, () -> delegate.describeAcls(filter)); + } + + @Override + public KafkaFuture createTopic(NewTopic topic) { + return logOperation("createTopic", topic, () -> delegate.createTopic(topic)); + } + + @Override + public KafkaFuture deleteTopic(String topicName) { + return logOperation("deleteTopic", topicName, () -> delegate.deleteTopic(topicName)); + } + + @Override + public KafkaFuture describeConfigs(ConfigResource resource) { + return logOperation("describeConfigs", resource, () -> delegate.describeConfigs(resource)); + } + + @Override + public KafkaFuture> describeCluster() { + return logOperation("describeCluster", "cluster", () -> delegate.describeCluster()); + } + + @Override + public KafkaFuture incrementalAlterConfigs(ConfigResource resource, Map configValues) { + return logOperation("incrementalAlterConfigs", Set.of(resource, configValues), + () -> delegate.incrementalAlterConfigs(resource, configValues)); + } + + @Override + public KafkaFuture describeTopic(String topicName) { + return logOperation("describeTopic", topicName, () -> delegate.describeTopic(topicName)); + } + + private KafkaFuture logOperation(String opText, Object logKey, Supplier> future) { + long startTime = System.currentTimeMillis(); + log.info("Kafka AdminClient Call on cluster {}: {} ({})", clusterId, opText, logKey); + + return future.get().whenComplete((v, t) -> logFutureComplete(opText, logKey, t, startTime)); + } + + private void logFutureComplete(String opText, Object logKey, Throwable error, long startTime) { + long totalTime = System.currentTimeMillis() - startTime; + if (error != null) { + log.error("Kafka operation {} for {} FAILED after {} ms", opText, logKey, totalTime, error); + } + else { + log.info("Kafka operation {} for {} COMPLETE after {} ms", opText, logKey, totalTime); + } + } +} diff --git a/src/main/java/com/hermesworld/ais/galapagos/naming/config/NamingConfig.java b/src/main/java/com/hermesworld/ais/galapagos/naming/config/NamingConfig.java index 4a26b54a..b28d07ba 100644 --- a/src/main/java/com/hermesworld/ais/galapagos/naming/config/NamingConfig.java +++ b/src/main/java/com/hermesworld/ais/galapagos/naming/config/NamingConfig.java @@ -5,10 +5,10 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.lang.NonNull; +import org.springframework.util.StringUtils; import org.springframework.validation.Errors; import org.springframework.validation.Validator; import org.springframework.validation.annotation.Validated; -import org.thymeleaf.util.StringUtils; @Configuration @ConfigurationProperties("galapagos.naming") @@ -53,7 +53,7 @@ public void validate(@NonNull Object target, @NonNull Errors errors) { checkValidFormat(target.toString(), errors); } else if (target instanceof AdditionNamingRules rules) { - if (!StringUtils.isEmpty(rules.getAllowedSeparators()) + if (StringUtils.hasLength(rules.getAllowedSeparators()) && !rules.getAllowedSeparators().matches(KAFKA_VALID_NAMES_REGEX)) { errors.rejectValue("allowedSeparators", "invalid.value", "The separators must be valid for use in Kafka Topic Names. Only dots, underscores, and hyphens are allowed."); diff --git a/src/main/java/com/hermesworld/ais/galapagos/naming/impl/NamingServiceImpl.java b/src/main/java/com/hermesworld/ais/galapagos/naming/impl/NamingServiceImpl.java index 070f9088..94cc5960 100644 --- a/src/main/java/com/hermesworld/ais/galapagos/naming/impl/NamingServiceImpl.java +++ b/src/main/java/com/hermesworld/ais/galapagos/naming/impl/NamingServiceImpl.java @@ -128,7 +128,7 @@ private boolean matchesFormat(String topicName, String format, AdditionNamingRul String addition = topicName.substring(commonPrefix.length()); - List sections = StringUtils.isEmpty(additionRules.getAllowedSeparators()) ? List.of(addition) + List sections = !StringUtils.hasLength(additionRules.getAllowedSeparators()) ? List.of(addition) : Arrays.asList(addition.split("[" + additionRules.getAllowedSeparators().replace("-", "\\-") + "]")); for (String section : sections) { diff --git a/src/main/java/com/hermesworld/ais/galapagos/notifications/impl/NotificationServiceImpl.java b/src/main/java/com/hermesworld/ais/galapagos/notifications/impl/NotificationServiceImpl.java index 72ae08ef..1d090d2c 100644 --- a/src/main/java/com/hermesworld/ais/galapagos/notifications/impl/NotificationServiceImpl.java +++ b/src/main/java/com/hermesworld/ais/galapagos/notifications/impl/NotificationServiceImpl.java @@ -8,6 +8,10 @@ import com.hermesworld.ais.galapagos.subscriptions.SubscriptionMetadata; import com.hermesworld.ais.galapagos.subscriptions.service.SubscriptionService; import com.hermesworld.ais.galapagos.topics.service.TopicService; +import jakarta.mail.MessagingException; +import jakarta.mail.internet.AddressException; +import jakarta.mail.internet.InternetAddress; +import jakarta.mail.internet.MimeMessage; import lombok.extern.slf4j.Slf4j; import org.jsoup.Jsoup; import org.jsoup.nodes.Document; @@ -23,10 +27,6 @@ import org.thymeleaf.ITemplateEngine; import org.thymeleaf.context.Context; -import jakarta.mail.MessagingException; -import jakarta.mail.internet.AddressException; -import jakarta.mail.internet.InternetAddress; -import jakarta.mail.internet.MimeMessage; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; @@ -95,7 +95,7 @@ public CompletableFuture notifySubscribers(String environmentId, String to .filter(req -> req.getState().equals(RequestState.APPROVED) && applicationId.equals(req.getApplicationId()) && !finalExcludeUser.equals(req.getUserName())) - .map(ApplicationOwnerRequest::getNotificationEmailAddress).filter(s -> !StringUtils.isEmpty(s)) + .map(ApplicationOwnerRequest::getNotificationEmailAddress).filter(s -> StringUtils.hasLength(s)) .collect(Collectors.toSet())); } @@ -105,7 +105,7 @@ public CompletableFuture notifySubscribers(String environmentId, String to @Override public CompletableFuture notifyRequestor(ApplicationOwnerRequest request, NotificationParams notificationParams) { - if (StringUtils.isEmpty(request.getNotificationEmailAddress())) { + if (!StringUtils.hasLength(request.getNotificationEmailAddress())) { log.warn("Could not send e-mail to requestor: no e-mail address found in request " + request.getId()); return CompletableFuture.completedFuture(null); } diff --git a/src/main/java/com/hermesworld/ais/galapagos/topics/controller/TopicController.java b/src/main/java/com/hermesworld/ais/galapagos/topics/controller/TopicController.java index 98df1f91..851bcf5f 100644 --- a/src/main/java/com/hermesworld/ais/galapagos/topics/controller/TopicController.java +++ b/src/main/java/com/hermesworld/ais/galapagos/topics/controller/TopicController.java @@ -17,6 +17,7 @@ import com.hermesworld.ais.galapagos.topics.TopicMetadata; import com.hermesworld.ais.galapagos.topics.TopicType; import com.hermesworld.ais.galapagos.topics.service.ValidatingTopicService; +import jakarta.validation.Valid; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.KafkaException; @@ -28,7 +29,6 @@ import org.springframework.web.bind.annotation.*; import org.springframework.web.server.ResponseStatusException; -import jakarta.validation.Valid; import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; @@ -112,7 +112,7 @@ public void addProducerToTopic(@PathVariable String environmentId, @PathVariable throw new ResponseStatusException(HttpStatus.FORBIDDEN); } - if (StringUtils.isEmpty(producer.getProducerApplicationId())) { + if (!StringUtils.hasLength(producer.getProducerApplicationId())) { throw new ResponseStatusException(HttpStatus.BAD_REQUEST); } @@ -191,7 +191,7 @@ public void updateTopic(@PathVariable String environmentId, @PathVariable String return; } - if (!StringUtils.isEmpty(request.getDeprecationText())) { + if (StringUtils.hasLength(request.getDeprecationText())) { if (request.getEolDate() == null) { throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "eolDate must be set for Topic deprecation"); @@ -228,7 +228,7 @@ public void updateTopicConfig(@PathVariable String environmentId, @PathVariable } for (UpdateTopicConfigEntryDto config : configs) { - if (StringUtils.isEmpty(config.getName()) || StringUtils.isEmpty(config.getValue())) { + if (!StringUtils.hasLength(config.getName()) || !StringUtils.hasLength(config.getValue())) { throw badRequest.get(); } } @@ -246,7 +246,7 @@ public void updateTopicConfig(@PathVariable String environmentId, @PathVariable @PostMapping(value = "/api/util/topicname", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) public TopicNameDto getTopicNameSuggestion(@RequestBody TopicNameSuggestionQueryDto query) { - if (StringUtils.isEmpty(query.getApplicationId()) || StringUtils.isEmpty(query.getEnvironmentId()) + if (!StringUtils.hasLength(query.getApplicationId()) || !StringUtils.hasLength(query.getEnvironmentId()) || query.getTopicType() == null) { throw badRequest.get(); } @@ -288,7 +288,7 @@ public TopicDto createTopic(@PathVariable String environmentId, @RequestBody Cre throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Missing topic type"); } - if (StringUtils.isEmpty(topicData.getName())) { + if (!StringUtils.hasLength(topicData.getName())) { throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Missing topic name"); } @@ -372,7 +372,7 @@ public ResponseEntity addTopicSchemaVersion(@PathVariable String environ throw new ResponseStatusException(HttpStatus.FORBIDDEN); } - if (schemaVersionDto == null || StringUtils.isEmpty(schemaVersionDto.getJsonSchema())) { + if (schemaVersionDto == null || !StringUtils.hasLength(schemaVersionDto.getJsonSchema())) { throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "JSON Schema (jsonSchema property) is missing from request body"); } diff --git a/src/main/java/com/hermesworld/ais/galapagos/uisupport/controller/UISupportController.java b/src/main/java/com/hermesworld/ais/galapagos/uisupport/controller/UISupportController.java index 44158b7a..c0466925 100644 --- a/src/main/java/com/hermesworld/ais/galapagos/uisupport/controller/UISupportController.java +++ b/src/main/java/com/hermesworld/ais/galapagos/uisupport/controller/UISupportController.java @@ -127,7 +127,7 @@ public String getFrameworkConfigTemplate(@PathVariable String environmentId, @Pa @PostMapping(value = "/api/util/topic-create-defaults", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) public TopicCreateDefaultsDto getDefaultTopicCreateParams(@RequestBody QueryTopicCreateDefaultsDto query) { TopicCreateDefaultsDto result = new TopicCreateDefaultsDto(); - if (!StringUtils.isEmpty(query.getApplicationId()) && !StringUtils.isEmpty(query.getEnvironmentId()) + if (StringUtils.hasLength(query.getApplicationId()) && StringUtils.hasLength(query.getEnvironmentId()) && query.getTopicType() != null) { result.setTopicNameSuggestion(getTopicNameSuggestion(query)); } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index cc469101..ce7be673 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -85,6 +85,12 @@ galapagos.kafka.certificates-workdir=file:/tmp # The prefix for Galapagos internal topics galapagos.kafka.metadataTopicsPrefix=galapagos.internal. +# Enable this to log all KafkaAdmin operations in detail +galapagos.kafka.logAdminOperations=false + +# The timeout for Kafka AdminClient operations +galapagos.kafka.adminClientRequestTimeout=30000 + # entries specifies the minimum number of changes in the dashboard. # minDays indicates that all changes since X days ago are displayed in the dashboard. # This Setting only impacts the UI and doesn't change the REST endpoint. The bigger value gets used. diff --git a/src/main/resources/keycloak.json b/src/main/resources/keycloak.json deleted file mode 100644 index a66944e2..00000000 --- a/src/main/resources/keycloak.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "auth-server-url": "https://my-keycloak.mycompany.int/auth/", - "realm": "galapagos", - "resource": "galapagos-webapp", - "public-client": true, - "use-resource-role-mappings": true, - "principal-attribute": "preferred_username" -} diff --git a/src/test/java/com/hermesworld/ais/galapagos/adminjobs/impl/GenerateToolingCertificateJobTest.java b/src/test/java/com/hermesworld/ais/galapagos/adminjobs/impl/GenerateToolingCertificateJobTest.java index f0f7e5aa..e911cc1f 100644 --- a/src/test/java/com/hermesworld/ais/galapagos/adminjobs/impl/GenerateToolingCertificateJobTest.java +++ b/src/test/java/com/hermesworld/ais/galapagos/adminjobs/impl/GenerateToolingCertificateJobTest.java @@ -130,7 +130,7 @@ void testStandard() throws Exception { byte[] readData = StreamUtils.copyToByteArray(fis); X509Certificate cert = extractCertificate(readData); - assertEquals("galapagos", CertificateUtil.extractCn(cert.getSubjectDN().getName())); + assertEquals("galapagos", CertificateUtil.extractCn(cert.getSubjectX500Principal().getName())); // and no data on STDOUT assertFalse(stdoutData.toString().contains(DATA_MARKER)); @@ -170,7 +170,7 @@ void testDataOnStdout() throws Exception { byte[] readData = Base64.getDecoder().decode(line); X509Certificate cert = extractCertificate(readData); - assertEquals("galapagos", CertificateUtil.extractCn(cert.getSubjectDN().getName())); + assertEquals("galapagos", CertificateUtil.extractCn(cert.getSubjectX500Principal().getName())); } private X509Certificate extractCertificate(byte[] p12Data) diff --git a/src/test/java/com/hermesworld/ais/galapagos/adminjobs/impl/ImportKnownApplicationsJobTest.java b/src/test/java/com/hermesworld/ais/galapagos/adminjobs/impl/ImportKnownApplicationsJobTest.java index 8ad6dcbb..f76ff45c 100644 --- a/src/test/java/com/hermesworld/ais/galapagos/adminjobs/impl/ImportKnownApplicationsJobTest.java +++ b/src/test/java/com/hermesworld/ais/galapagos/adminjobs/impl/ImportKnownApplicationsJobTest.java @@ -1,12 +1,5 @@ package com.hermesworld.ais.galapagos.adminjobs.impl; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.PrintStream; -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Optional; - import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.hermesworld.ais.galapagos.applications.impl.KnownApplicationImpl; @@ -14,24 +7,31 @@ import com.hermesworld.ais.galapagos.kafka.KafkaClusters; import com.hermesworld.ais.galapagos.kafka.impl.TopicBasedRepositoryMock; import com.hermesworld.ais.galapagos.util.JsonUtil; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.springframework.boot.ApplicationArguments; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import org.springframework.boot.ApplicationArguments; class ImportKnownApplicationsJobTest { private KafkaClusters kafkaClusters; - private File fileWithOutInfoUrl = new File("src/test/resources/test-applications.json"); + private final File fileWithOutInfoUrl = new File("src/test/resources/test-applications.json"); - private File fileWithInfoUrl = new File("src/test/resources/test-applications-infoUrl.json"); + private final File fileWithInfoUrl = new File("src/test/resources/test-applications-infoUrl.json"); private TopicBasedRepositoryMock appRepository; @@ -53,15 +53,13 @@ void setUp() { @Test void reImportAfterAppChanges() throws Exception { - - List knownApplications = mapper.readValue(fileWithOutInfoUrl, - new TypeReference>() { - }); + List knownApplications = mapper.readValue(fileWithOutInfoUrl, new TypeReference<>() { + }); ImportKnownApplicationsJob job = new ImportKnownApplicationsJob(kafkaClusters); ApplicationArguments args = mock(ApplicationArguments.class); when(args.getOptionValues("applications.import.file")).thenReturn(List.of(fileWithInfoUrl.getPath())); - knownApplications.forEach(app -> appRepository.save(app)); + knownApplications.forEach(app -> safeGet(appRepository.save(app))); // redirect STDOUT to check update count ByteArrayOutputStream buffer = new ByteArrayOutputStream(); @@ -75,25 +73,24 @@ void reImportAfterAppChanges() throws Exception { System.setOut(oldOut); } - String output = new String(buffer.toByteArray(), StandardCharsets.UTF_8); + String output = buffer.toString(StandardCharsets.UTF_8); assertTrue(output.contains("\n1 new application(s) imported.")); + // noinspection OptionalGetWithoutIsPresent assertEquals("https://www.google.com", appRepository.getObject("app-1").get().getInfoUrl().toString()); - } @Test void importApps_alreadyIdentical() throws Exception { - List knownApplications = mapper.readValue(fileWithOutInfoUrl, - new TypeReference>() { - }); + List knownApplications = mapper.readValue(fileWithOutInfoUrl, new TypeReference<>() { + }); ImportKnownApplicationsJob job = new ImportKnownApplicationsJob(kafkaClusters); ApplicationArguments args = mock(ApplicationArguments.class); when(args.getOptionValues("applications.import.file")).thenReturn(List.of(fileWithOutInfoUrl.getPath())); TopicBasedRepositoryMock appRepository = new TopicBasedRepositoryMock<>(); - knownApplications.forEach(app -> appRepository.save(app)); + knownApplications.forEach(app -> safeGet(appRepository.save(app))); when(kafkaClusters.getGlobalRepository("known-applications", KnownApplicationImpl.class)) .thenReturn(appRepository); @@ -109,7 +106,7 @@ void importApps_alreadyIdentical() throws Exception { System.setOut(oldOut); } - String output = new String(buffer.toByteArray(), StandardCharsets.UTF_8); + String output = buffer.toString(StandardCharsets.UTF_8); assertTrue(output.contains("\n0 new application(s) imported.")); @@ -137,13 +134,23 @@ void importApps_positiv() throws Exception { System.setOut(oldOut); } - String output = new String(buffer.toByteArray(), StandardCharsets.UTF_8); + String output = buffer.toString(StandardCharsets.UTF_8); assertTrue(output.contains("\n5 new application(s) imported.")); assertEquals(5, appRepository.getObjects().size()); assertTrue(appRepository.getObject("2222").isPresent()); + // noinspection OptionalGetWithoutIsPresent assertEquals("High Five", appRepository.getObject("F.I.V.E").get().getName()); } + private void safeGet(CompletableFuture future) { + try { + future.get(); + } + catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + } \ No newline at end of file diff --git a/src/test/java/com/hermesworld/ais/galapagos/certificates/impl/CaManagerImplTest.java b/src/test/java/com/hermesworld/ais/galapagos/certificates/impl/CaManagerImplTest.java index f2c4771e..25f4ae47 100644 --- a/src/test/java/com/hermesworld/ais/galapagos/certificates/impl/CaManagerImplTest.java +++ b/src/test/java/com/hermesworld/ais/galapagos/certificates/impl/CaManagerImplTest.java @@ -104,7 +104,8 @@ void testExtendCertificate() throws Exception { assertEquals("CN=quattro,OU=certification_12345", result.getDn()); // to be VERY sure, also inspect certificate (note that toString() output is slightly different) - assertEquals("CN=quattro, OU=certification_12345", result.getCertificate().getSubjectDN().toString()); + assertEquals("CN=quattro, OU=certification_12345", + result.getCertificate().getSubjectX500Principal().toString()); } @Test diff --git a/src/test/java/com/hermesworld/ais/galapagos/kafka/impl/AdminClientStub.java b/src/test/java/com/hermesworld/ais/galapagos/kafka/impl/AdminClientStub.java index 6a37b264..4b8d60ff 100644 --- a/src/test/java/com/hermesworld/ais/galapagos/kafka/impl/AdminClientStub.java +++ b/src/test/java/com/hermesworld/ais/galapagos/kafka/impl/AdminClientStub.java @@ -1,27 +1,21 @@ package com.hermesworld.ais.galapagos.kafka.impl; -import org.apache.kafka.clients.admin.*; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.*; +import com.hermesworld.ais.galapagos.kafka.KafkaClusterAdminClient; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.internals.KafkaFutureImpl; -import org.apache.kafka.common.quota.ClientQuotaAlteration; -import org.apache.kafka.common.quota.ClientQuotaFilter; -import org.mockito.Mockito; import org.springframework.kafka.KafkaException; -import java.time.Duration; import java.util.*; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import static org.mockito.Mockito.when; - -@SuppressWarnings("deprecation") -public class AdminClientStub extends AdminClient { +public class AdminClientStub implements KafkaClusterAdminClient { private final List aclBindings = new ArrayList<>(); @@ -48,314 +42,63 @@ public void setFailOnDescribeCluster(boolean failOnDescribeCluster) { } @Override - public void close(long duration, TimeUnit unit) { - } - - @Override - public void close(Duration timeout) { - } - - @Override - public CreateTopicsResult createTopics(Collection newTopics, CreateTopicsOptions options) { - topics.addAll(newTopics); - CreateTopicsResult result = Mockito.mock(CreateTopicsResult.class); - Mockito.when(result.all()).thenReturn(completedFuture(null)); - return result; + public KafkaFuture createTopic(NewTopic topic) { + topics.add(topic); + return completedFuture(null); } @Override - public DeleteTopicsResult deleteTopics(Collection topics, DeleteTopicsOptions options) { - // Auto-generated method stub - return null; + public KafkaFuture deleteTopic(String topicName) { + topics.removeIf(t -> topicName.equals(t.name())); + return completedFuture(null); } @Override - public ListTopicsResult listTopics(ListTopicsOptions options) { - // Auto-generated method stub - return null; - } + public KafkaFuture describeTopic(String topicName) { + if (topics.stream().anyMatch(t -> topicName.equals(t.name()))) { + return completedFuture(new TopicDescription(topicName, false, List.of())); + } - @Override - public DescribeTopicsResult describeTopics(Collection topicNames, DescribeTopicsOptions options) { - // Auto-generated method stub - return null; + return completedFuture(null); } @Override - public DescribeClusterResult describeCluster(DescribeClusterOptions options) { - DescribeClusterResult result = Mockito.mock(DescribeClusterResult.class); + public KafkaFuture> describeCluster() { Node node = new Node(1, "localhost", 1); if (failOnDescribeCluster) { - when(result.nodes()).thenReturn(failingFuture(new KafkaException("Kafka failed"))); - } - else { - when(result.nodes()).thenReturn(completedFuture(List.of(node))); + return failingFuture(new KafkaException("Kafka failed")); } - return result; + return completedFuture(List.of(node)); } @Override - public DescribeAclsResult describeAcls(AclBindingFilter filter, DescribeAclsOptions options) { - List matches = this.aclBindings.stream().filter(filter::matches).collect(Collectors.toList()); - DescribeAclsResult result = Mockito.mock(DescribeAclsResult.class); - Mockito.when(result.values()).thenReturn(completedFuture(matches)); - return result; + public KafkaFuture> describeAcls(AclBindingFilter filter) { + List matches = this.aclBindings.stream().filter(filter::matches).toList(); + return completedFuture(matches); } @Override - public CreateAclsResult createAcls(Collection acls, CreateAclsOptions options) { - this.aclBindings.addAll(acls); - CreateAclsResult result = Mockito.mock(CreateAclsResult.class); - Mockito.when(result.all()).thenReturn(completedFuture(null)); - return result; + public KafkaFuture createAcls(Collection bindings) { + this.aclBindings.addAll(bindings); + return completedFuture(null); } @Override - public DeleteAclsResult deleteAcls(Collection filters, DeleteAclsOptions options) { + public KafkaFuture> deleteAcls(Collection filters) { Set removes = new HashSet<>(); filters.forEach(filter -> this.aclBindings.stream().filter(filter::matches).forEach(removes::add)); this.aclBindings.removeAll(removes); - DeleteAclsResult result = Mockito.mock(DeleteAclsResult.class); - Mockito.when(result.all()).thenReturn(completedFuture(null)); - return result; - } - - @Override - public DescribeConfigsResult describeConfigs(Collection resources, DescribeConfigsOptions options) { - // Auto-generated method stub - return null; - } - - @Override - public AlterConfigsResult alterConfigs(Map configs, AlterConfigsOptions options) { - // currently not implemented in stub - AlterConfigsResult result = Mockito.mock(AlterConfigsResult.class); - Mockito.when(result.all()).thenReturn(completedFuture(null)); - return result; - } - - @Override - public AlterConfigsResult incrementalAlterConfigs(Map> map, - AlterConfigsOptions alterConfigsOptions) { - return null; - } - - @Override - public AlterReplicaLogDirsResult alterReplicaLogDirs(Map replicaAssignment, - AlterReplicaLogDirsOptions options) { - // Auto-generated method stub - return null; - } - - @Override - public DescribeLogDirsResult describeLogDirs(Collection brokers, DescribeLogDirsOptions options) { - // Auto-generated method stub - return null; - } - - @Override - public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection replicas, - DescribeReplicaLogDirsOptions options) { - // Auto-generated method stub - return null; - } - - @Override - public CreatePartitionsResult createPartitions(Map newPartitions, - CreatePartitionsOptions options) { - // Auto-generated method stub - return null; - } - - @Override - public DeleteRecordsResult deleteRecords(Map recordsToDelete, - DeleteRecordsOptions options) { - // Auto-generated method stub - return null; - } - - @Override - public CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options) { - // Auto-generated method stub - return null; - } - - @Override - public RenewDelegationTokenResult renewDelegationToken(byte[] hmac, RenewDelegationTokenOptions options) { - // Auto-generated method stub - return null; - } - - @Override - public ExpireDelegationTokenResult expireDelegationToken(byte[] hmac, ExpireDelegationTokenOptions options) { - // Auto-generated method stub - return null; - } - - @Override - public DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions options) { - // Auto-generated method stub - return null; - } - - @Override - public DescribeConsumerGroupsResult describeConsumerGroups(Collection groupIds, - DescribeConsumerGroupsOptions options) { - // Auto-generated method stub - return null; - } - - @Override - public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) { - // Auto-generated method stub - return null; - } - - @Override - public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, - ListConsumerGroupOffsetsOptions options) { - // Auto-generated method stub - return null; - } - - @Override - public DeleteConsumerGroupsResult deleteConsumerGroups(Collection groupIds, - DeleteConsumerGroupsOptions options) { - // Auto-generated method stub - return null; - } - - @Override - public DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(String s, Set set, - DeleteConsumerGroupOffsetsOptions deleteConsumerGroupOffsetsOptions) { - return null; - } - - @Override - public ElectPreferredLeadersResult electPreferredLeaders(Collection partitions, - ElectPreferredLeadersOptions options) { - // Auto-generated method stub - return null; - } - - @Override - public ElectLeadersResult electLeaders(ElectionType electionType, Set set, - ElectLeadersOptions electLeadersOptions) { - return null; - } - - @Override - public AlterPartitionReassignmentsResult alterPartitionReassignments( - Map> map, - AlterPartitionReassignmentsOptions alterPartitionReassignmentsOptions) { - return null; - } - - @Override - public ListPartitionReassignmentsResult listPartitionReassignments(Optional> optional, - ListPartitionReassignmentsOptions listPartitionReassignmentsOptions) { - return null; - } - - @Override - public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(String s, - RemoveMembersFromConsumerGroupOptions removeMembersFromConsumerGroupOptions) { - return null; - } - - @Override - public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String s, - Map map, - AlterConsumerGroupOffsetsOptions alterConsumerGroupOffsetsOptions) { - return null; - } - - @Override - public ListOffsetsResult listOffsets(Map map, ListOffsetsOptions listOffsetsOptions) { - return null; - } - - @Override - public Map metrics() { - // Auto-generated method stub - return null; - } - - @Override - public DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter) { - // Auto-generated method stub - return null; - } - - @Override - public DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter clientQuotaFilter, - DescribeClientQuotasOptions describeClientQuotasOptions) { - // Auto-generated method stub - return null; - } - - @Override - public AlterClientQuotasResult alterClientQuotas(Collection entries) { - // Auto-generated method stub - return null; - } - - @Override - public AlterClientQuotasResult alterClientQuotas(Collection collection, - AlterClientQuotasOptions alterClientQuotasOptions) { - // Auto-generated method stub - return null; - } - - @Override - public DescribeUserScramCredentialsResult describeUserScramCredentials() { - // Auto-generated method stub - return null; - } - - @Override - public DescribeUserScramCredentialsResult describeUserScramCredentials(List users) { - // Auto-generated method stub - return null; - } - - @Override - public DescribeUserScramCredentialsResult describeUserScramCredentials(List list, - DescribeUserScramCredentialsOptions describeUserScramCredentialsOptions) { - // Auto-generated method stub - return null; - } - - @Override - public AlterUserScramCredentialsResult alterUserScramCredentials(List alterations) { - // Auto-generated method stub - return null; - } - - @Override - public AlterUserScramCredentialsResult alterUserScramCredentials(List list, - AlterUserScramCredentialsOptions alterUserScramCredentialsOptions) { - // Auto-generated method stub - return null; - } - - @Override - public DescribeFeaturesResult describeFeatures() { - // Auto-generated method stub - return null; + return completedFuture(removes); } @Override - public DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions describeFeaturesOptions) { - // Auto-generated method stub - return null; + public KafkaFuture describeConfigs(ConfigResource resource) { + return completedFuture(null); } @Override - public UpdateFeaturesResult updateFeatures(Map map, - UpdateFeaturesOptions updateFeaturesOptions) { - // Auto-generated method stub - return null; + public KafkaFuture incrementalAlterConfigs(ConfigResource resource, Map configValues) { + return completedFuture(null); } private KafkaFuture completedFuture(T value) { diff --git a/src/test/java/com/hermesworld/ais/galapagos/kafka/impl/ConnectedKafkaClusterTest.java b/src/test/java/com/hermesworld/ais/galapagos/kafka/impl/ConnectedKafkaClusterTest.java index f0d8e97c..715493a6 100644 --- a/src/test/java/com/hermesworld/ais/galapagos/kafka/impl/ConnectedKafkaClusterTest.java +++ b/src/test/java/com/hermesworld/ais/galapagos/kafka/impl/ConnectedKafkaClusterTest.java @@ -1,7 +1,13 @@ package com.hermesworld.ais.galapagos.kafka.impl; -import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.mock; +import com.hermesworld.ais.galapagos.kafka.KafkaExecutorFactory; +import com.hermesworld.ais.galapagos.kafka.KafkaUser; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.acl.*; +import org.apache.kafka.common.resource.PatternType; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; +import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Collection; @@ -9,19 +15,8 @@ import java.util.Set; import java.util.concurrent.Executors; -import org.apache.kafka.clients.admin.CreateAclsResult; -import org.apache.kafka.clients.admin.DeleteAclsResult; -import org.apache.kafka.common.acl.AccessControlEntry; -import org.apache.kafka.common.acl.AclBinding; -import org.apache.kafka.common.acl.AclBindingFilter; -import org.apache.kafka.common.acl.AclOperation; -import org.apache.kafka.common.acl.AclPermissionType; -import org.apache.kafka.common.resource.PatternType; -import org.apache.kafka.common.resource.ResourcePattern; -import org.apache.kafka.common.resource.ResourceType; -import org.junit.jupiter.api.Test; -import com.hermesworld.ais.galapagos.kafka.KafkaExecutorFactory; -import com.hermesworld.ais.galapagos.kafka.KafkaUser; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.mock; class ConnectedKafkaClusterTest { @@ -32,13 +27,13 @@ void testUpdateAcls() throws Exception { AdminClientStub adminClient = new AdminClientStub() { @Override - public CreateAclsResult createAcls(Collection acls) { + public KafkaFuture createAcls(Collection acls) { createdAcls.addAll(acls); return super.createAcls(acls); } @Override - public DeleteAclsResult deleteAcls(Collection filters) { + public KafkaFuture> deleteAcls(Collection filters) { deletedAcls.addAll(filters); return super.deleteAcls(filters); } diff --git a/src/test/java/com/hermesworld/ais/galapagos/kafka/impl/KafkaFutureDecouplerTest.java b/src/test/java/com/hermesworld/ais/galapagos/kafka/impl/KafkaFutureDecouplerTest.java index 7736f20f..f99353df 100644 --- a/src/test/java/com/hermesworld/ais/galapagos/kafka/impl/KafkaFutureDecouplerTest.java +++ b/src/test/java/com/hermesworld/ais/galapagos/kafka/impl/KafkaFutureDecouplerTest.java @@ -1,19 +1,6 @@ package com.hermesworld.ais.galapagos.kafka.impl; -import static org.junit.jupiter.api.Assertions.*; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - +import com.hermesworld.ais.galapagos.kafka.KafkaExecutorFactory; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AclBinding; @@ -22,36 +9,32 @@ import org.apache.kafka.common.resource.PatternType; import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourceType; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.kafka.KafkaException; -import org.springframework.util.concurrent.FailureCallback; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.ListenableFutureCallback; -import org.springframework.util.concurrent.SuccessCallback; -import com.hermesworld.ais.galapagos.kafka.KafkaExecutorFactory; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.*; class KafkaFutureDecouplerTest { - private static ThreadFactory tfAdminClient = new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "admin-client-" + System.currentTimeMillis()); - } - }; + private static final ThreadFactory tfAdminClient = r -> new Thread(r, "admin-client-" + System.currentTimeMillis()); - private static ThreadFactory tfDecoupled = new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "decoupled-" + System.currentTimeMillis()); - } - }; + private static final ThreadFactory tfDecoupled = r -> new Thread(r, "decoupled-" + System.currentTimeMillis()); + + private static final KafkaExecutorFactory adminClientExecutorFactory = () -> Executors + .newSingleThreadExecutor(tfAdminClient); - private static KafkaExecutorFactory executorFactory = () -> { - return Executors.newSingleThreadExecutor(tfDecoupled); - }; + private static final KafkaExecutorFactory executorFactory = () -> Executors.newSingleThreadExecutor(tfDecoupled); private AdminClientStub adminClient; @@ -61,16 +44,11 @@ void initAdminClient() { adminClient.setKafkaThreadFactory(tfAdminClient); } - @AfterEach - void closeAdminClient() { - adminClient.close(); - } - @Test void testDecoupling_kafkaFuture() throws Exception { // first, test that the futures usually would complete on our Threads AtomicBoolean onAdminClientThread = new AtomicBoolean(); - adminClient.describeCluster().nodes().thenApply(c -> { + adminClient.describeCluster().thenApply(c -> { onAdminClientThread.set(Thread.currentThread().getName().startsWith("admin-client-")); return null; }).get(); @@ -81,7 +59,7 @@ void testDecoupling_kafkaFuture() throws Exception { KafkaFutureDecoupler decoupler = new KafkaFutureDecoupler(executorFactory); onAdminClientThread.set(false); - decoupler.toCompletableFuture(adminClient.describeCluster().nodes()).thenCompose(o -> { + decoupler.toCompletableFuture(adminClient.describeCluster()).thenCompose(o -> { onAdminClientThread.set(Thread.currentThread().getName().startsWith("admin-client-")); return CompletableFuture.completedFuture(null); }).get(); @@ -89,19 +67,41 @@ void testDecoupling_kafkaFuture() throws Exception { assertFalse(onAdminClientThread.get()); } + @Test + void testDecoupling_completableFuture() throws Exception { + AtomicBoolean onAdminClientThread = new AtomicBoolean(); + + // after decoupling, future should complete on another Thread + KafkaFutureDecoupler decoupler = new KafkaFutureDecoupler(executorFactory); + + CompletableFuture future = CompletableFuture.runAsync(() -> { + try { + Thread.sleep(200); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, adminClientExecutorFactory.newExecutor()); + + decoupler.toCompletableFuture(future).thenCompose(o -> { + onAdminClientThread.set(Thread.currentThread().getName().startsWith("admin-client-")); + return CompletableFuture.completedFuture(null); + }).get(); + + assertFalse(onAdminClientThread.get(), "Future was not decoupled; completion stage ran on admin client Thread"); + } + @Test void testDecoupling_concatenation() throws Exception { List threadNames = new ArrayList(); KafkaFutureDecoupler decoupler = new KafkaFutureDecoupler(executorFactory); - decoupler.toCompletableFuture(adminClient.describeCluster().nodes()).thenCompose(o -> { + decoupler.toCompletableFuture(adminClient.describeCluster()).thenCompose(o -> { threadNames.add(Thread.currentThread().getName()); - return decoupler.toCompletableFuture(adminClient - .createAcls(List.of(new AclBinding( - new ResourcePattern(ResourceType.TOPIC, "test", PatternType.LITERAL), - new AccessControlEntry("testuser", "*", AclOperation.ALL, AclPermissionType.ALLOW)))) - .all()); + return decoupler.toCompletableFuture(adminClient.createAcls( + List.of(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "test", PatternType.LITERAL), + new AccessControlEntry("testuser", "*", AclOperation.ALL, AclPermissionType.ALLOW))))); }).thenApply(o -> { threadNames.add(Thread.currentThread().getName()); return null; @@ -115,23 +115,6 @@ void testDecoupling_concatenation() throws Exception { assertEquals(2, new HashSet<>(threadNames).size()); } - @Test - void testDecoupling_listenableFuture() throws Exception { - AtomicBoolean onAdminClientThread = new AtomicBoolean(); - - // after decoupling, future should complete on another Thread - KafkaFutureDecoupler decoupler = new KafkaFutureDecoupler(executorFactory); - - ListenableFuture future = new KafkaFutureListenableAdapter<>(adminClient.describeCluster().nodes()); - - decoupler.toCompletableFuture(future).thenCompose(o -> { - onAdminClientThread.set(Thread.currentThread().getName().startsWith("admin-client-")); - return CompletableFuture.completedFuture(null); - }).get(); - - assertFalse(onAdminClientThread.get()); - } - @Test void testDecoupling_doneFuture() throws Exception { AtomicInteger factoryInvocations = new AtomicInteger(); @@ -143,7 +126,7 @@ void testDecoupling_doneFuture() throws Exception { KafkaFutureDecoupler decoupler = new KafkaFutureDecoupler(countingExecutorFactory); - KafkaFuture future = adminClient.describeCluster().nodes(); + KafkaFuture future = adminClient.describeCluster(); future.get(); AtomicBoolean applyInvoked = new AtomicBoolean(); @@ -164,13 +147,10 @@ void testDecoupling_failingFuture() throws Exception { KafkaFutureDecoupler decoupler = new KafkaFutureDecoupler(countingExecutorFactory); - AtomicBoolean onAdminClientThread = new AtomicBoolean(); - adminClient.setFailOnDescribeCluster(true); - KafkaFuture future = adminClient.describeCluster().nodes(); + KafkaFuture future = adminClient.describeCluster(); try { decoupler.toCompletableFuture(future).whenComplete((t, ex) -> { - onAdminClientThread.set(Thread.currentThread().getName().startsWith("admin-client-")); }).get(); fail("Decoupled future should have failed"); } @@ -192,7 +172,7 @@ void testDecoupling_failedFuture_direct() throws Exception { KafkaFutureDecoupler decoupler = new KafkaFutureDecoupler(countingExecutorFactory); adminClient.setFailOnDescribeCluster(true); - KafkaFuture future = adminClient.describeCluster().nodes(); + KafkaFuture future = adminClient.describeCluster(); try { future.get(); fail("Future should have failed"); @@ -211,62 +191,4 @@ void testDecoupling_failedFuture_direct() throws Exception { } } - private static class KafkaFutureListenableAdapter implements ListenableFuture { - - private KafkaFuture adaptee; - - public KafkaFutureListenableAdapter(KafkaFuture adaptee) { - this.adaptee = adaptee; - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return adaptee.cancel(mayInterruptIfRunning); - } - - @Override - public T get() throws InterruptedException, ExecutionException { - return adaptee.get(); - } - - @Override - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return adaptee.get(timeout, unit); - } - - @Override - public boolean isCancelled() { - return adaptee.isCancelled(); - } - - @Override - public boolean isDone() { - return adaptee.isDone(); - } - - @Override - public void addCallback(ListenableFutureCallback callback) { - adaptee.whenComplete((t, ex) -> { - if (ex != null) { - callback.onFailure(ex); - } - else { - callback.onSuccess(t); - } - }); - } - - @Override - public void addCallback(SuccessCallback successCallback, FailureCallback failureCallback) { - adaptee.whenComplete((t, ex) -> { - if (ex != null) { - failureCallback.onFailure(ex); - } - else { - successCallback.onSuccess(t); - } - }); - } - } - } diff --git a/src/test/java/com/hermesworld/ais/galapagos/security/SecurityConfigIntegrationTest.java b/src/test/java/com/hermesworld/ais/galapagos/security/SecurityConfigIntegrationTest.java new file mode 100644 index 00000000..bc38d343 --- /dev/null +++ b/src/test/java/com/hermesworld/ais/galapagos/security/SecurityConfigIntegrationTest.java @@ -0,0 +1,104 @@ +package com.hermesworld.ais.galapagos.security; + +import com.hermesworld.ais.galapagos.applications.ApplicationsService; +import com.hermesworld.ais.galapagos.applications.controller.ApplicationsController; +import com.hermesworld.ais.galapagos.kafka.KafkaClusters; +import com.hermesworld.ais.galapagos.security.config.GalapagosSecurityProperties; +import com.hermesworld.ais.galapagos.staging.StagingService; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.security.oauth2.client.servlet.OAuth2ClientAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.boot.test.web.server.LocalServerPort; +import org.springframework.http.*; +import org.springframework.security.oauth2.jwt.Jwt; +import org.springframework.security.oauth2.jwt.JwtDecoder; + +import java.net.URI; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +@SpringBootTest(classes = { SecurityConfig.class, ApplicationsController.class, + GalapagosSecurityProperties.class }, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@EnableAutoConfiguration(exclude = OAuth2ClientAutoConfiguration.class) +class SecurityConfigIntegrationTest { + + @LocalServerPort + private int port; + + @Autowired + private TestRestTemplate restTemplate; + + @SuppressWarnings("unused") + @MockBean + private ApplicationsService applicationsService; + + @SuppressWarnings("unused") + @MockBean + private StagingService stagingService; + + @SuppressWarnings("unused") + @MockBean + private KafkaClusters kafkaClusters; + + @MockBean + private JwtDecoder jwtDecoder; + + @BeforeEach + void initJwtStuff() { + when(jwtDecoder.decode(any())).thenAnswer(inv -> { + String token = inv.getArgument(0); + Map headers = Map.of("alg", "HS256", "typ", "JWT"); + Map claims = Map.of("sub", "abc123", "iat", "123", "my_roles", token.replace(".", " ")); + return new Jwt(token, Instant.now(), Instant.now().plus(1, ChronoUnit.DAYS), headers, claims); + }); + } + + @Test + void test_apiAccessProtected() { + ResponseEntity response = restTemplate.getForEntity("http://localhost:" + port + "/api/me/requests", + String.class); + assertEquals(HttpStatus.UNAUTHORIZED.value(), response.getStatusCode().value()); + } + + @Test + void test_apiAccess_missingUserRole() { + testApiWithRole("/api/me/requests", "NOT_A_USER", HttpStatus.FORBIDDEN.value()); + } + + @Test + void test_apiAccess_withUserRole() { + testApiWithRole("/api/me/requests", "USER", HttpStatus.OK.value()); + } + + @Test + void test_apiAccess_adminEndpoint_withUserRole() { + testApiWithRole("/api/admin/requests", "USER", HttpStatus.FORBIDDEN.value()); + } + + @Test + void test_apiAccess_adminEndpoint_withAdminRole() { + testApiWithRole("/api/admin/requests", "USER.ADMIN", HttpStatus.OK.value()); + } + + private void testApiWithRole(String endpoint, String roleName, int expectedCode) { + String url = "http://localhost:" + port + endpoint; + + HttpHeaders headers = new HttpHeaders(); + headers.add(HttpHeaders.AUTHORIZATION, "Bearer " + roleName); + + HttpEntity request = new RequestEntity<>(headers, HttpMethod.GET, URI.create(url)); + ResponseEntity response = restTemplate.exchange(url, HttpMethod.GET, request, String.class); + assertEquals(expectedCode, response.getStatusCode().value()); + } + +} diff --git a/src/test/java/com/hermesworld/ais/galapagos/security/impl/OAuthConfigControllerIntegrationTest.java b/src/test/java/com/hermesworld/ais/galapagos/security/impl/OAuthConfigControllerIntegrationTest.java new file mode 100644 index 00000000..cd95af14 --- /dev/null +++ b/src/test/java/com/hermesworld/ais/galapagos/security/impl/OAuthConfigControllerIntegrationTest.java @@ -0,0 +1,124 @@ +package com.hermesworld.ais.galapagos.security.impl; + +import com.hermesworld.ais.galapagos.security.SecurityConfig; +import com.hermesworld.ais.galapagos.security.config.GalapagosSecurityProperties; +import okhttp3.mockwebserver.Dispatcher; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import org.jetbrains.annotations.NotNull; +import org.json.JSONObject; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.security.oauth2.client.OAuth2ClientProperties; +import org.springframework.boot.autoconfigure.security.oauth2.client.servlet.OAuth2ClientAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.boot.test.web.server.LocalServerPort; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.security.oauth2.jwt.JwtDecoder; +import org.springframework.util.StreamUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; + +@SpringBootTest(classes = { OAuthConfigController.class, SecurityConfig.class, + GalapagosSecurityProperties.class }, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@EnableAutoConfiguration(exclude = OAuth2ClientAutoConfiguration.class) +class OAuthConfigControllerIntegrationTest { + + @LocalServerPort + private int port; + + @Autowired + private TestRestTemplate restTemplate; + + @MockBean + private OAuth2ClientProperties oauthProperties; + + @MockBean + @SuppressWarnings("unused") + private JwtDecoder jwtDecoder; + + private MockWebServer oauthServer; + + @BeforeEach + void initOauthPropertiesAndServer() throws Exception { + oauthServer = new MockWebServer(); + oauthServer.setDispatcher(new Dispatcher() { + @NotNull + @Override + public MockResponse dispatch(@NotNull RecordedRequest recordedRequest) { + String path = recordedRequest.getPath(); + if (path == null) { + return new MockResponse().setResponseCode(404); + } + if (recordedRequest.getPath().endsWith("/openid-configuration")) { + return new MockResponse().setBody(readOpenidConfig()).setResponseCode(200) + .setHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE); + } + + return new MockResponse().setResponseCode(404); + } + }); + oauthServer.start(0); + + Map oauthMap = new HashMap<>(); + OAuth2ClientProperties.Registration reg = new OAuth2ClientProperties.Registration(); + reg.setClientId("test-webapp"); + reg.setProvider("keycloak"); + reg.setScope(Set.of("email", "openid", "profile")); + oauthMap.put("keycloak", reg); + + Map providerMap = new HashMap<>(); + OAuth2ClientProperties.Provider provider = new OAuth2ClientProperties.Provider(); + provider.setIssuerUri("http://localhost:" + oauthServer.getPort() + "/auth/realms/galapagos"); + providerMap.put("keycloak", provider); + + when(oauthProperties.getRegistration()).thenReturn(oauthMap); + when(oauthProperties.getProvider()).thenReturn(providerMap); + } + + @AfterEach + void shutdownServer() throws IOException { + oauthServer.shutdown(); + } + + @Test + void test_getOauthConfig() { + ResponseEntity response = restTemplate.getForEntity("http://localhost:" + port + "/oauth2/config.json", + String.class); + assertTrue(response.getStatusCode().is2xxSuccessful()); + + JSONObject config = new JSONObject(response.getBody()); + assertEquals("test_username", config.get("userNameClaim")); + assertEquals("my_roles", config.get("rolesClaim")); + assertEquals("display_name", config.get("displayNameClaim")); + assertEquals("test-webapp", config.get("clientId")); + } + + private String readOpenidConfig() { + try (InputStream in = OAuthConfigControllerIntegrationTest.class.getClassLoader() + .getResourceAsStream("openid-config.json")) { + return StreamUtils.copyToString(in, StandardCharsets.UTF_8).replace("http://keycloak/", + "http://localhost:" + oauthServer.getPort() + "/"); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } +} \ No newline at end of file diff --git a/src/test/resources/application.properties b/src/test/resources/application.properties index 81b8a632..7326755d 100644 --- a/src/test/resources/application.properties +++ b/src/test/resources/application.properties @@ -2,9 +2,9 @@ spring.datasource.url=jdbc:h2:file:./target/test-db;DB_CLOSE_ON_EXIT=TRUE;AUTO_R spring.jpa.hibernate.ddl-auto=update galapagos.security.jwt-email-claim=email -galapagos.security.jwt-display-name-claim=name -galapagos.security.jwt-role-claim=roles -galapagos.security.jwt-user-name-claim=username +galapagos.security.jwt-display-name-claim=display_name +galapagos.security.jwt-role-claim=my_roles +galapagos.security.jwt-user-name-claim=test_username spring.security.oauth2.client.registration.keycloak.client-id=test-webapp spring.security.oauth2.client.registration.keycloak.scope=openid,profile,email,offline_access spring.security.oauth2.client.registration.keycloak.authorization-grant-type=authorization_code diff --git a/src/test/resources/openid-config.json b/src/test/resources/openid-config.json new file mode 100644 index 00000000..8a577eef --- /dev/null +++ b/src/test/resources/openid-config.json @@ -0,0 +1,271 @@ +{ + "issuer": "http://keycloak/auth/realms/galapagos", + "authorization_endpoint": "http://keycloak/auth/realms/galapagos/protocol/openid-connect/auth", + "token_endpoint": "http://keycloak/auth/realms/galapagos/protocol/openid-connect/token", + "introspection_endpoint": "http://keycloak/auth/realms/galapagos/protocol/openid-connect/token/introspect", + "userinfo_endpoint": "http://keycloak/auth/realms/galapagos/protocol/openid-connect/userinfo", + "end_session_endpoint": "http://keycloak/auth/realms/galapagos/protocol/openid-connect/logout", + "frontchannel_logout_session_supported": true, + "frontchannel_logout_supported": true, + "jwks_uri": "http://keycloak/auth/realms/galapagos/protocol/openid-connect/certs", + "check_session_iframe": "http://keycloak/auth/realms/galapagos/protocol/openid-connect/login-status-iframe.html", + "grant_types_supported": [ + "authorization_code", + "implicit", + "refresh_token", + "password", + "client_credentials", + "urn:ietf:params:oauth:grant-type:device_code", + "urn:openid:params:grant-type:ciba" + ], + "response_types_supported": [ + "code", + "none", + "id_token", + "token", + "id_token token", + "code id_token", + "code token", + "code id_token token" + ], + "subject_types_supported": [ + "public", + "pairwise" + ], + "id_token_signing_alg_values_supported": [ + "PS384", + "ES384", + "RS384", + "HS256", + "HS512", + "ES256", + "RS256", + "HS384", + "ES512", + "PS256", + "PS512", + "RS512" + ], + "id_token_encryption_alg_values_supported": [ + "RSA-OAEP", + "RSA-OAEP-256", + "RSA1_5" + ], + "id_token_encryption_enc_values_supported": [ + "A256GCM", + "A192GCM", + "A128GCM", + "A128CBC-HS256", + "A192CBC-HS384", + "A256CBC-HS512" + ], + "userinfo_signing_alg_values_supported": [ + "PS384", + "ES384", + "RS384", + "HS256", + "HS512", + "ES256", + "RS256", + "HS384", + "ES512", + "PS256", + "PS512", + "RS512", + "none" + ], + "request_object_signing_alg_values_supported": [ + "PS384", + "ES384", + "RS384", + "HS256", + "HS512", + "ES256", + "RS256", + "HS384", + "ES512", + "PS256", + "PS512", + "RS512", + "none" + ], + "request_object_encryption_alg_values_supported": [ + "RSA-OAEP", + "RSA-OAEP-256", + "RSA1_5" + ], + "request_object_encryption_enc_values_supported": [ + "A256GCM", + "A192GCM", + "A128GCM", + "A128CBC-HS256", + "A192CBC-HS384", + "A256CBC-HS512" + ], + "response_modes_supported": [ + "query", + "fragment", + "form_post", + "query.jwt", + "fragment.jwt", + "form_post.jwt", + "jwt" + ], + "registration_endpoint": "http://keycloak/auth/realms/galapagos/clients-registrations/openid-connect", + "token_endpoint_auth_methods_supported": [ + "private_key_jwt", + "client_secret_basic", + "client_secret_post", + "tls_client_auth", + "client_secret_jwt" + ], + "token_endpoint_auth_signing_alg_values_supported": [ + "PS384", + "ES384", + "RS384", + "HS256", + "HS512", + "ES256", + "RS256", + "HS384", + "ES512", + "PS256", + "PS512", + "RS512" + ], + "introspection_endpoint_auth_methods_supported": [ + "private_key_jwt", + "client_secret_basic", + "client_secret_post", + "tls_client_auth", + "client_secret_jwt" + ], + "introspection_endpoint_auth_signing_alg_values_supported": [ + "PS384", + "ES384", + "RS384", + "HS256", + "HS512", + "ES256", + "RS256", + "HS384", + "ES512", + "PS256", + "PS512", + "RS512" + ], + "authorization_signing_alg_values_supported": [ + "PS384", + "ES384", + "RS384", + "HS256", + "HS512", + "ES256", + "RS256", + "HS384", + "ES512", + "PS256", + "PS512", + "RS512" + ], + "authorization_encryption_alg_values_supported": [ + "RSA-OAEP", + "RSA-OAEP-256", + "RSA1_5" + ], + "authorization_encryption_enc_values_supported": [ + "A256GCM", + "A192GCM", + "A128GCM", + "A128CBC-HS256", + "A192CBC-HS384", + "A256CBC-HS512" + ], + "claims_supported": [ + "aud", + "sub", + "iss", + "auth_time", + "name", + "given_name", + "family_name", + "preferred_username", + "email", + "acr" + ], + "claim_types_supported": [ + "normal" + ], + "claims_parameter_supported": true, + "scopes_supported": [ + "openid", + "microprofile-jwt", + "address", + "roles", + "web-origins", + "offline_access", + "profile", + "email" + ], + "request_parameter_supported": true, + "request_uri_parameter_supported": true, + "require_request_uri_registration": true, + "code_challenge_methods_supported": [ + "plain", + "S256" + ], + "tls_client_certificate_bound_access_tokens": true, + "revocation_endpoint": "http://keycloak/auth/realms/galapagos/protocol/openid-connect/revoke", + "revocation_endpoint_auth_methods_supported": [ + "private_key_jwt", + "client_secret_basic", + "client_secret_post", + "tls_client_auth", + "client_secret_jwt" + ], + "revocation_endpoint_auth_signing_alg_values_supported": [ + "PS384", + "ES384", + "RS384", + "HS256", + "HS512", + "ES256", + "RS256", + "HS384", + "ES512", + "PS256", + "PS512", + "RS512" + ], + "backchannel_logout_supported": true, + "backchannel_logout_session_supported": true, + "device_authorization_endpoint": "http://keycloak/auth/realms/galapagos/protocol/openid-connect/auth/device", + "backchannel_token_delivery_modes_supported": [ + "poll", + "ping" + ], + "backchannel_authentication_endpoint": "http://keycloak/auth/realms/galapagos/protocol/openid-connect/ext/ciba/auth", + "backchannel_authentication_request_signing_alg_values_supported": [ + "PS384", + "ES384", + "RS384", + "ES256", + "RS256", + "ES512", + "PS256", + "PS512", + "RS512" + ], + "require_pushed_authorization_requests": false, + "pushed_authorization_request_endpoint": "http://keycloak/auth/realms/galapagos/protocol/openid-connect/ext/par/request", + "mtls_endpoint_aliases": { + "token_endpoint": "http://keycloak/auth/realms/galapagos/protocol/openid-connect/token", + "revocation_endpoint": "http://keycloak/auth/realms/galapagos/protocol/openid-connect/revoke", + "introspection_endpoint": "http://keycloak/auth/realms/galapagos/protocol/openid-connect/token/introspect", + "device_authorization_endpoint": "http://keycloak/auth/realms/galapagos/protocol/openid-connect/auth/device", + "registration_endpoint": "http://keycloak/auth/realms/galapagos/clients-registrations/openid-connect", + "userinfo_endpoint": "http://keycloak/auth/realms/galapagos/protocol/openid-connect/userinfo", + "pushed_authorization_request_endpoint": "http://keycloak/auth/realms/galapagos/protocol/openid-connect/ext/par/request", + "backchannel_authentication_endpoint": "http://keycloak/auth/realms/galapagos/protocol/openid-connect/ext/ciba/auth" + } +} diff --git a/ui/src/app/app-routing.module.ts b/ui/src/app/app-routing.module.ts index eee58250..0f19ac5d 100644 --- a/ui/src/app/app-routing.module.ts +++ b/ui/src/app/app-routing.module.ts @@ -3,10 +3,21 @@ import { RouterModule, Routes } from '@angular/router'; import { AuthGuard } from './shared'; const routes: Routes = [ - { path: '', loadChildren: () => import('./layout/layout.module').then(m => m.LayoutModule), canActivate: [AuthGuard] }, + { + path: '', + loadChildren: () => import('./layout/layout.module').then(m => m.LayoutModule), + canActivate: [AuthGuard] + }, { path: 'error', loadChildren: () => import('./server-error/server-error.module').then(m => m.ServerErrorModule) }, - { path: 'access-denied', loadChildren: () => import('./access-denied/access-denied.module').then(m => m.AccessDeniedModule) }, + { + path: 'access-denied', + loadChildren: () => import('./access-denied/access-denied.module').then(m => m.AccessDeniedModule) + }, { path: 'not-found', loadChildren: () => import('./not-found/not-found.module').then(m => m.NotFoundModule) }, + { + path: 'logout-success', + loadChildren: () => import('./logout-success/logout-success.module').then(m => m.LogoutSuccessModule) + }, { path: '**', redirectTo: 'not-found' } ]; diff --git a/ui/src/app/app.component.html b/ui/src/app/app.component.html index 9f11dcbb..bb4ab665 100644 --- a/ui/src/app/app.component.html +++ b/ui/src/app/app.component.html @@ -1,4 +1,4 @@ -
+
diff --git a/ui/src/app/layout/components/header/header.component.html b/ui/src/app/layout/components/header/header.component.html index 4c8a4e04..8f5d5ae3 100644 --- a/ui/src/app/layout/components/header/header.component.html +++ b/ui/src/app/layout/components/header/header.component.html @@ -48,7 +48,7 @@ [routerLink]="['/user-settings']"> {{ 'User Settings' | translate }} - + {{ 'Log Out' | translate }} diff --git a/ui/src/app/layout/components/header/header.component.ts b/ui/src/app/layout/components/header/header.component.ts index 85745de0..a98d6e62 100644 --- a/ui/src/app/layout/components/header/header.component.ts +++ b/ui/src/app/layout/components/header/header.component.ts @@ -77,7 +77,7 @@ export class HeaderComponent implements OnInit { dom.classList.toggle(this.pushRightClass); } - async onLoggedout() { + onLoggedout() { return this.authService.logout(); } diff --git a/ui/src/app/logout-success/logout-success-routing.module.ts b/ui/src/app/logout-success/logout-success-routing.module.ts new file mode 100644 index 00000000..40986917 --- /dev/null +++ b/ui/src/app/logout-success/logout-success-routing.module.ts @@ -0,0 +1,16 @@ +import { NgModule } from '@angular/core'; +import { RouterModule, Routes } from '@angular/router'; +import { LogoutSuccessComponent } from './logout-success.component'; + +const routes: Routes = [ + { + path: '', component: LogoutSuccessComponent + } +]; + +@NgModule({ + imports: [RouterModule.forChild(routes)], + exports: [RouterModule] +}) +export class LogoutSuccessRoutingModule { +} diff --git a/ui/src/app/logout-success/logout-success.component.html b/ui/src/app/logout-success/logout-success.component.html new file mode 100644 index 00000000..87daddb1 --- /dev/null +++ b/ui/src/app/logout-success/logout-success.component.html @@ -0,0 +1,3 @@ +

+ You have been logged out successfully. Click here to log in again. +

diff --git a/ui/src/app/logout-success/logout-success.component.scss b/ui/src/app/logout-success/logout-success.component.scss new file mode 100644 index 00000000..be89e36d --- /dev/null +++ b/ui/src/app/logout-success/logout-success.component.scss @@ -0,0 +1,5 @@ +p { + margin-top: 1em; + text-align: center; +} + diff --git a/ui/src/app/logout-success/logout-success.component.ts b/ui/src/app/logout-success/logout-success.component.ts new file mode 100644 index 00000000..4c5381f1 --- /dev/null +++ b/ui/src/app/logout-success/logout-success.component.ts @@ -0,0 +1,16 @@ +import { Component, OnInit } from '@angular/core'; + +@Component({ + selector: 'app-logout-success', + templateUrl: './logout-success.component.html', + styleUrls: ['./logout-success.component.scss'] +}) +export class LogoutSuccessComponent implements OnInit { + + constructor() { + } + + ngOnInit() { + } + +} diff --git a/ui/src/app/logout-success/logout-success.module.ts b/ui/src/app/logout-success/logout-success.module.ts new file mode 100644 index 00000000..429dfc3f --- /dev/null +++ b/ui/src/app/logout-success/logout-success.module.ts @@ -0,0 +1,14 @@ +import { NgModule } from '@angular/core'; +import { CommonModule } from '@angular/common'; +import { LogoutSuccessComponent } from './logout-success.component'; +import { LogoutSuccessRoutingModule } from './logout-success-routing.module'; + +@NgModule({ + imports: [ + CommonModule, + LogoutSuccessRoutingModule + ], + declarations: [LogoutSuccessComponent] +}) +export class LogoutSuccessModule { +} diff --git a/ui/src/app/not-found/not-found.component.html b/ui/src/app/not-found/not-found.component.html index ff604933..f174ffb3 100644 --- a/ui/src/app/not-found/not-found.component.html +++ b/ui/src/app/not-found/not-found.component.html @@ -1,3 +1,4 @@

- not-found works! + Specified page could not be found. Please contact an Administrator if you think this is an error.
+ Back to Galapagos start page

diff --git a/ui/src/app/not-found/not-found.component.scss b/ui/src/app/not-found/not-found.component.scss index e69de29b..d9a49be5 100644 --- a/ui/src/app/not-found/not-found.component.scss +++ b/ui/src/app/not-found/not-found.component.scss @@ -0,0 +1,4 @@ +p { + margin-top: 1em; + text-align: center; +} diff --git a/ui/src/app/shared/guard/auth.guard.ts b/ui/src/app/shared/guard/auth.guard.ts index e82f02d5..7fd0245c 100644 --- a/ui/src/app/shared/guard/auth.guard.ts +++ b/ui/src/app/shared/guard/auth.guard.ts @@ -14,6 +14,7 @@ export class AuthGuard implements CanActivate { const authenticated = await this.authService.tryLoginFromData(); if (!authenticated) { + this.authService.showBounceBubbles.next(true); return this.authService.login(state.url); } diff --git a/ui/src/app/shared/services/auth.service.ts b/ui/src/app/shared/services/auth.service.ts index cabd42aa..d91a2ee5 100644 --- a/ui/src/app/shared/services/auth.service.ts +++ b/ui/src/app/shared/services/auth.service.ts @@ -29,6 +29,8 @@ export class AuthService { userProfile: Observable; + showBounceBubbles = new BehaviorSubject(false); + private authenticatedSubject = new BehaviorSubject(false); private rolesSubject = new BehaviorSubject([]); @@ -53,6 +55,7 @@ export class AuthService { this.loadAuthConfig().then(config => { this.configure(config); this.configLoaded.next(true); + this.showBounceBubbles.next(false); }); window.addEventListener('storage', event => { @@ -87,8 +90,8 @@ export class AuthService { }); } - public logout() { - this.oauthService.logOut(); + async logout() { + return this.oauthService.logOut(true, 'state'); } private getConfigLoadedObservable(): Observable { @@ -113,7 +116,7 @@ export class AuthService { preserveRequestedRoute: false, - postLogoutRedirectUri: '/logout' + redirectUriAsPostLogoutRedirectUriFallback: false })))); } @@ -150,10 +153,11 @@ export class AuthService { return authenticated; } - private handleLogout() { + private async handleLogout() { this.authenticatedSubject.next(false); this.profileSubject.next(emptyUserProfile); this.rolesSubject.next([]); + await this.router.navigateByUrl('/logout-success'); } private extractRoles(jwtClaims: object): string[] {