diff --git a/erc20/core/src/main/kotlin/com/rarible/protocol/erc20/core/service/Erc20AllowanceService.kt b/erc20/core/src/main/kotlin/com/rarible/protocol/erc20/core/service/Erc20AllowanceService.kt index 6ff896850..db0cf77fa 100644 --- a/erc20/core/src/main/kotlin/com/rarible/protocol/erc20/core/service/Erc20AllowanceService.kt +++ b/erc20/core/src/main/kotlin/com/rarible/protocol/erc20/core/service/Erc20AllowanceService.kt @@ -1,6 +1,7 @@ package com.rarible.protocol.erc20.core.service import com.rarible.contracts.erc20.IERC20 +import com.rarible.core.common.EventTimeMarks import com.rarible.core.common.nowMillis import com.rarible.core.common.optimisticLock import com.rarible.core.entity.reducer.service.EntityService @@ -10,6 +11,7 @@ import com.rarible.protocol.erc20.core.event.Erc20BalanceEventListener import com.rarible.protocol.erc20.core.model.BalanceId import com.rarible.protocol.erc20.core.model.Erc20Allowance import com.rarible.protocol.erc20.core.model.Erc20AllowanceEvent +import com.rarible.protocol.erc20.core.model.Erc20Event import com.rarible.protocol.erc20.core.model.Erc20MarkedEvent import com.rarible.protocol.erc20.core.repository.Erc20AllowanceRepository import kotlinx.coroutines.reactor.awaitSingleOrNull @@ -41,14 +43,21 @@ class Erc20AllowanceService( ) } - override suspend fun update(entity: Erc20Allowance, event: Erc20MarkedEvent?): Erc20Allowance { + override suspend fun update(entity: Erc20Allowance, event: Erc20MarkedEvent?): Erc20Allowance = + update(entity = entity, event = event?.event, eventTimeMarks = event?.eventTimeMarks) + + private suspend fun update( + entity: Erc20Allowance, + eventTimeMarks: EventTimeMarks?, + event: Erc20Event? + ): Erc20Allowance { val result = if (saveToDb) { erc20AllowanceRepository.save(entity) } else { entity } erc20BalanceEventListeners.forEach { - it.onUpdate(Erc20AllowanceEvent(event?.event, event?.eventTimeMarks, entity)) + it.onUpdate(Erc20AllowanceEvent(event, eventTimeMarks, entity)) } return result } @@ -58,7 +67,11 @@ class Erc20AllowanceService( ?.let { EthUInt256(it) } } - suspend fun onChainUpdate(balanceId: BalanceId, event: Erc20MarkedEvent?) { + suspend fun onChainUpdate( + balanceId: BalanceId, + eventTimeMarks: EventTimeMarks?, + event: Erc20Event? + ) { optimisticLock { val erc20Allowance = get(balanceId) ?: Erc20Allowance( token = balanceId.token, @@ -71,7 +84,7 @@ class Erc20AllowanceService( logger.error("Can't get allowance $balanceId from blockchain") throw IllegalStateException("Can't get allowance $balanceId from blockchain") } - update(entity = erc20Allowance.withAllowance(allowance), event = event) + update(entity = erc20Allowance.withAllowance(allowance), event = event, eventTimeMarks = eventTimeMarks) } } diff --git a/erc20/core/src/main/kotlin/com/rarible/protocol/erc20/core/service/reduce/Erc20EventChainUpdateService.kt b/erc20/core/src/main/kotlin/com/rarible/protocol/erc20/core/service/reduce/Erc20EventChainUpdateService.kt index a05194dcc..4ac917cfe 100644 --- a/erc20/core/src/main/kotlin/com/rarible/protocol/erc20/core/service/reduce/Erc20EventChainUpdateService.kt +++ b/erc20/core/src/main/kotlin/com/rarible/protocol/erc20/core/service/reduce/Erc20EventChainUpdateService.kt @@ -31,7 +31,11 @@ class Erc20EventChainUpdateService( } val approvalEvent = erc20Events.lastOrNull { it?.event is Erc20Event.Erc20TokenApprovalEvent } if (approvalEvent != null) { - erc20AllowanceService.onChainUpdate(balanceId = balanceId, event = approvalEvent) + erc20AllowanceService.onChainUpdate( + balanceId = balanceId, + event = approvalEvent.event, + eventTimeMarks = approvalEvent.eventTimeMarks + ) } val balance = erc20BalanceService.onChainUpdate(balanceId, erc20Events.last()) if (balance == null) { diff --git a/erc20/core/src/test/kotlin/com/rarible/protocol/erc20/core/service/Erc20AllowanceServiceTest.kt b/erc20/core/src/test/kotlin/com/rarible/protocol/erc20/core/service/Erc20AllowanceServiceTest.kt index 66cfc23b1..c9e7a3784 100644 --- a/erc20/core/src/test/kotlin/com/rarible/protocol/erc20/core/service/Erc20AllowanceServiceTest.kt +++ b/erc20/core/src/test/kotlin/com/rarible/protocol/erc20/core/service/Erc20AllowanceServiceTest.kt @@ -96,7 +96,7 @@ internal class Erc20AllowanceServiceTest { coEvery { erc20BalanceEventListener.onUpdate(any()) } returns Unit - erc20AllowanceService.onChainUpdate(balanceId, null) + erc20AllowanceService.onChainUpdate(balanceId = balanceId, event = null, eventTimeMarks = null) coVerify { erc20BalanceEventListener.onUpdate(withArg { @@ -141,7 +141,7 @@ internal class Erc20AllowanceServiceTest { coEvery { erc20BalanceEventListener.onUpdate(any()) } returns Unit - erc20AllowanceService.onChainUpdate(balanceId, null) + erc20AllowanceService.onChainUpdate(balanceId = balanceId, event = null, eventTimeMarks = null) coVerify { erc20BalanceEventListener.onUpdate(withArg { @@ -179,7 +179,7 @@ internal class Erc20AllowanceServiceTest { assertThatExceptionOfType(IllegalStateException::class.java).isThrownBy { runBlocking { - erc20AllowanceService.onChainUpdate(balanceId, null) + erc20AllowanceService.onChainUpdate(balanceId = balanceId, event = null, eventTimeMarks = null) } } } diff --git a/erc20/core/src/test/kotlin/com/rarible/protocol/erc20/core/service/reduce/Erc20EventChainUpdateServiceTest.kt b/erc20/core/src/test/kotlin/com/rarible/protocol/erc20/core/service/reduce/Erc20EventChainUpdateServiceTest.kt index 40b219ecc..14889e22f 100644 --- a/erc20/core/src/test/kotlin/com/rarible/protocol/erc20/core/service/reduce/Erc20EventChainUpdateServiceTest.kt +++ b/erc20/core/src/test/kotlin/com/rarible/protocol/erc20/core/service/reduce/Erc20EventChainUpdateServiceTest.kt @@ -68,12 +68,12 @@ class Erc20EventChainUpdateServiceTest { ).map { LogRecordEvent(it, false, EventTimeMarks("test", emptyList())) } coEvery { balanceService.onChainUpdate(balanceId, any()) } returns mockk() - coEvery { allowanceService.onChainUpdate(balanceId, any()) } returns Unit + coEvery { allowanceService.onChainUpdate(balanceId, any(), any()) } returns Unit service.onEntityEvents(events) coVerify { balanceService.onChainUpdate(balanceId, any()) } - coVerify { allowanceService.onChainUpdate(balanceId, any()) } + coVerify { allowanceService.onChainUpdate(balanceId, any(), any()) } coVerify(exactly = 0) { delegate.onEntityEvents(any()) } } } diff --git a/erc20/listener/pom.xml b/erc20/listener/pom.xml index ebcbbce15..1bf51756c 100644 --- a/erc20/listener/pom.xml +++ b/erc20/listener/pom.xml @@ -25,6 +25,10 @@ com.rarible.protocol protocol-erc20-indexer-core + + com.rarible.protocol.ethereum + protocol-subscriber-order-starter + com.rarible.protocol.ethereum protocol-subscriber-erc20-starter diff --git a/erc20/listener/src/main/kotlin/com/rarible/protocol/erc20/listener/configuration/Erc20ListenerConfiguration.kt b/erc20/listener/src/main/kotlin/com/rarible/protocol/erc20/listener/configuration/Erc20ListenerConfiguration.kt index 131d81a79..9ad4f2866 100644 --- a/erc20/listener/src/main/kotlin/com/rarible/protocol/erc20/listener/configuration/Erc20ListenerConfiguration.kt +++ b/erc20/listener/src/main/kotlin/com/rarible/protocol/erc20/listener/configuration/Erc20ListenerConfiguration.kt @@ -3,19 +3,25 @@ package com.rarible.protocol.erc20.listener.configuration import com.github.cloudyrock.spring.v5.EnableMongock import com.rarible.core.application.ApplicationEnvironmentInfo import com.rarible.core.kafka.RaribleKafkaConsumerFactory +import com.rarible.core.kafka.RaribleKafkaConsumerSettings import com.rarible.core.kafka.RaribleKafkaConsumerWorker import com.rarible.ethereum.contract.EnableContractService import com.rarible.ethereum.converters.EnableScaletherMongoConversions import com.rarible.ethereum.domain.Blockchain +import com.rarible.protocol.dto.ActivityTopicProvider import com.rarible.protocol.dto.Erc20BalanceEventDto +import com.rarible.protocol.dto.EthActivityEventDto import com.rarible.protocol.erc20.api.subscriber.Erc20IndexerEventsConsumerFactory import com.rarible.protocol.erc20.core.configuration.ProducerConfiguration import com.rarible.protocol.erc20.core.event.Erc20EventPublisher import com.rarible.protocol.erc20.core.event.KafkaErc20BalanceEventListener import com.rarible.protocol.erc20.core.metric.CheckerMetrics import com.rarible.protocol.erc20.core.metric.DescriptorMetrics +import com.rarible.protocol.erc20.listener.listener.OrderActivityEventHandler import com.rarible.protocol.erc20.listener.scanner.BalanceBatchCheckerHandler +import com.rarible.protocol.order.api.subscriber.autoconfigure.OrderIndexerEventsSubscriberProperties import io.micrometer.core.instrument.MeterRegistry +import org.apache.kafka.clients.consumer.OffsetResetStrategy import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration @@ -31,12 +37,16 @@ import scalether.core.MonoEthereum class Erc20ListenerConfiguration( private val environmentInfo: ApplicationEnvironmentInfo, private val commonProperties: Erc20ListenerProperties, - private val erc20IndexerEventsConsumerFactory: Erc20IndexerEventsConsumerFactory + private val erc20IndexerEventsConsumerFactory: Erc20IndexerEventsConsumerFactory, + private val orderIndexerEventsSubscriberProperties: OrderIndexerEventsSubscriberProperties, ) { private val erc20BalanceConsumerGroup = "protocol.${commonProperties.blockchain.value}.erc20.indexer.erc20-balance" + private val erc20OrderActivityConsumerGroup = + "protocol.${commonProperties.blockchain.value}.erc20.indexer.order-activity" + @Bean fun raribleKafkaConsumerFactory(): RaribleKafkaConsumerFactory { return RaribleKafkaConsumerFactory( @@ -88,4 +98,29 @@ class Erc20ListenerConfiguration( handler = BalanceBatchCheckerHandler(ethereum, checkerMetrics, commonProperties) ) } + + @Bean + fun orderActivityWorker( + checkerMetrics: CheckerMetrics, + raribleKafkaConsumerFactory: RaribleKafkaConsumerFactory, + orderActivityEventHandler: OrderActivityEventHandler, + ): RaribleKafkaConsumerWorker { + val settings = RaribleKafkaConsumerSettings( + hosts = orderIndexerEventsSubscriberProperties.brokerReplicaSet, + topic = ActivityTopicProvider.getActivityTopic( + environment = environmentInfo.name, + blockchain = blockchain().value + ), + group = erc20OrderActivityConsumerGroup, + concurrency = commonProperties.orderActivityProperties.eventsHandleConcurrency, + batchSize = commonProperties.orderActivityProperties.eventsHandleBatchSize, + async = false, + offsetResetStrategy = OffsetResetStrategy.LATEST, + valueClass = EthActivityEventDto::class.java, + ) + return raribleKafkaConsumerFactory.createWorker( + settings = settings, + handler = orderActivityEventHandler, + ) + } } diff --git a/erc20/listener/src/main/kotlin/com/rarible/protocol/erc20/listener/configuration/Erc20ListenerProperties.kt b/erc20/listener/src/main/kotlin/com/rarible/protocol/erc20/listener/configuration/Erc20ListenerProperties.kt index 6815e50b6..8fe4a7a84 100644 --- a/erc20/listener/src/main/kotlin/com/rarible/protocol/erc20/listener/configuration/Erc20ListenerProperties.kt +++ b/erc20/listener/src/main/kotlin/com/rarible/protocol/erc20/listener/configuration/Erc20ListenerProperties.kt @@ -21,9 +21,15 @@ data class Erc20ListenerProperties( val eventConsumerWorker: DaemonWorkerProperties = DaemonWorkerProperties(), val skipTransferContracts: List = emptyList(), val balanceCheckerProperties: BalanceCheckerProperties = BalanceCheckerProperties(), + val orderActivityProperties: OrderActivityProperties = OrderActivityProperties(), val job: Erc20JobProperties = Erc20JobProperties() ) +data class OrderActivityProperties( + val eventsHandleBatchSize: Int = 200, + val eventsHandleConcurrency: Int = 9, +) + data class BalanceCheckerProperties( val skipNumberOfBlocks: Long = 20, val confirms: Int = 2, diff --git a/erc20/listener/src/main/kotlin/com/rarible/protocol/erc20/listener/listener/OrderActivityEventHandler.kt b/erc20/listener/src/main/kotlin/com/rarible/protocol/erc20/listener/listener/OrderActivityEventHandler.kt new file mode 100644 index 000000000..72c9e7032 --- /dev/null +++ b/erc20/listener/src/main/kotlin/com/rarible/protocol/erc20/listener/listener/OrderActivityEventHandler.kt @@ -0,0 +1,59 @@ +package com.rarible.protocol.erc20.listener.listener + +import com.rarible.core.kafka.RaribleKafkaEventHandler +import com.rarible.protocol.dto.Erc20AssetTypeDto +import com.rarible.protocol.dto.EthActivityEventDto +import com.rarible.protocol.dto.OrderActivityMatchDto +import com.rarible.protocol.dto.OrderActivityMatchSideDto +import com.rarible.protocol.dto.toModel +import com.rarible.protocol.erc20.core.misc.addIndexerIn +import com.rarible.protocol.erc20.core.model.BalanceId +import com.rarible.protocol.erc20.core.service.Erc20AllowanceService +import org.slf4j.LoggerFactory +import org.springframework.stereotype.Component + +@Component +class OrderActivityEventHandler( + private val erc20AllowanceService: Erc20AllowanceService, +) : RaribleKafkaEventHandler { + + override suspend fun handle(event: EthActivityEventDto) { + val activity = event.activity + if (activity !is OrderActivityMatchDto) { + return + } + logger.info("Handle event: $event") + if (activity.type == OrderActivityMatchDto.Type.ACCEPT_BID) { + handleActivity(activity, event) + } + } + + private suspend fun handleActivity( + activity: OrderActivityMatchDto, + event: EthActivityEventDto + ) { + val sideDto = if (activity.left.type == OrderActivityMatchSideDto.Type.BID) { + activity.left + } else { + activity.right + } + val assetType = sideDto.asset.assetType + if (assetType is Erc20AssetTypeDto) { + val owner = sideDto.maker + val token = assetType.contract + logger.info( + "Will recalculate erc20 balance owner=$owner, token=$token. " + + "After bid execution id=${sideDto.hash}" + ) + erc20AllowanceService.onChainUpdate( + balanceId = BalanceId(token = token, owner = owner), + eventTimeMarks = event.eventTimeMarks.toModel().addIndexerIn(), + event = null, + ) + } + } + + companion object { + private val logger = LoggerFactory.getLogger(OrderActivityEventHandler::class.java) + } +} diff --git a/erc20/listener/src/main/resources/config/application-consul.yml b/erc20/listener/src/main/resources/config/application-consul.yml index 0732d01ed..c6a7ab8d0 100644 --- a/erc20/listener/src/main/resources/config/application-consul.yml +++ b/erc20/listener/src/main/resources/config/application-consul.yml @@ -21,5 +21,9 @@ protocol: erc20: subscriber: broker-replica-set: ${kafka.hosts} + order: + subscriber: + broker-replica-set: ${kafka.hosts} + common: kafka-replica-set: ${kafka.hosts} diff --git a/erc20/listener/src/test/kotlin/com/rarible/protocol/erc20/listener/listener/OrderActivityEventHandlerTest.kt b/erc20/listener/src/test/kotlin/com/rarible/protocol/erc20/listener/listener/OrderActivityEventHandlerTest.kt new file mode 100644 index 000000000..0b211426b --- /dev/null +++ b/erc20/listener/src/test/kotlin/com/rarible/protocol/erc20/listener/listener/OrderActivityEventHandlerTest.kt @@ -0,0 +1,269 @@ +package com.rarible.protocol.erc20.listener.listener + +import com.rarible.core.common.nowMillis +import com.rarible.core.test.data.randomWord +import com.rarible.protocol.dto.AssetDto +import com.rarible.protocol.dto.Erc20AssetTypeDto +import com.rarible.protocol.dto.Erc721AssetTypeDto +import com.rarible.protocol.dto.EthActivityEventDto +import com.rarible.protocol.dto.EthAssetTypeDto +import com.rarible.protocol.dto.EventTimeMarksDto +import com.rarible.protocol.dto.OrderActivityCancelListDto +import com.rarible.protocol.dto.OrderActivityDto +import com.rarible.protocol.dto.OrderActivityMatchDto +import com.rarible.protocol.dto.OrderActivityMatchSideDto +import com.rarible.protocol.erc20.core.model.BalanceId +import com.rarible.protocol.erc20.core.service.Erc20AllowanceService +import io.daonomic.rpc.domain.Word +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.mockito.InjectMocks +import org.mockito.Mock +import org.mockito.junit.jupiter.MockitoExtension +import org.mockito.kotlin.argThat +import org.mockito.kotlin.eq +import org.mockito.kotlin.isNull +import org.mockito.kotlin.verify +import org.mockito.kotlin.verifyNoInteractions +import scalether.domain.Address +import java.math.BigDecimal +import java.math.BigInteger + +@ExtendWith(MockitoExtension::class) +internal class OrderActivityEventHandlerTest { + @InjectMocks + private lateinit var orderActivityEventHandler: OrderActivityEventHandler + + @Mock + private lateinit var erc20AllowanceService: Erc20AllowanceService + + @Test + fun handle() = runBlocking { + orderActivityEventHandler.handle( + EthActivityEventDto( + activity = OrderActivityMatchDto( + id = "id", + blockHash = Word.apply(randomWord()), + blockNumber = 1, + date = nowMillis(), + logIndex = 1, + price = BigDecimal.ONE, + transactionHash = Word.apply(randomWord()), + source = OrderActivityDto.Source.RARIBLE, + type = OrderActivityMatchDto.Type.ACCEPT_BID, + left = OrderActivityMatchSideDto( + hash = Word.apply(randomWord()), + maker = Address.ONE(), + asset = AssetDto( + assetType = Erc20AssetTypeDto( + contract = Address.TWO(), + ), + value = BigInteger.ONE, + ), + type = OrderActivityMatchSideDto.Type.BID, + ), + right = OrderActivityMatchSideDto( + hash = Word.apply(randomWord()), + maker = Address.THREE(), + asset = AssetDto( + assetType = Erc721AssetTypeDto( + contract = Address.FOUR(), + tokenId = BigInteger.ONE, + ), + value = BigInteger.ONE, + ), + type = OrderActivityMatchSideDto.Type.SELL, + ) + + ), + eventTimeMarks = EventTimeMarksDto("source", marks = emptyList()) + ) + ) + + verify(erc20AllowanceService).onChainUpdate( + balanceId = eq( + BalanceId( + owner = Address.ONE(), + token = Address.TWO() + ) + ), + eventTimeMarks = argThat { + source == "source" && marks.size == 1 && marks[0].name == "indexer-in_erc20" + }, + event = isNull(), + ) + } + + @Test + fun `handle not ACCEPT_BID`() = runBlocking { + orderActivityEventHandler.handle( + EthActivityEventDto( + activity = OrderActivityMatchDto( + id = "id", + blockHash = Word.apply(randomWord()), + blockNumber = 1, + date = nowMillis(), + logIndex = 1, + price = BigDecimal.ONE, + transactionHash = Word.apply(randomWord()), + source = OrderActivityDto.Source.RARIBLE, + type = OrderActivityMatchDto.Type.SELL, + right = OrderActivityMatchSideDto( + hash = Word.apply(randomWord()), + maker = Address.ONE(), + asset = AssetDto( + assetType = Erc20AssetTypeDto( + contract = Address.TWO(), + ), + value = BigInteger.ONE, + ) + ), + left = OrderActivityMatchSideDto( + hash = Word.apply(randomWord()), + maker = Address.THREE(), + asset = AssetDto( + assetType = Erc721AssetTypeDto( + contract = Address.FOUR(), + tokenId = BigInteger.ONE, + ), + value = BigInteger.ONE, + ) + ) + + ), + eventTimeMarks = EventTimeMarksDto("source", marks = emptyList()) + ) + ) + + verifyNoInteractions(erc20AllowanceService) + } + + @Test + fun `handle not match`() = runBlocking { + orderActivityEventHandler.handle( + EthActivityEventDto( + activity = OrderActivityCancelListDto( + id = "id", + blockHash = Word.apply(randomWord()), + blockNumber = 1, + date = nowMillis(), + logIndex = 1, + transactionHash = Word.apply(randomWord()), + source = OrderActivityDto.Source.RARIBLE, + take = Erc20AssetTypeDto( + contract = Address.TWO(), + ), + make = Erc721AssetTypeDto( + contract = Address.FOUR(), + tokenId = BigInteger.ONE, + ), + maker = Address.ONE(), + hash = Word.apply(randomWord()) + ), + eventTimeMarks = EventTimeMarksDto("source", marks = emptyList()) + ) + ) + + verifyNoInteractions(erc20AllowanceService) + } + + @Test + fun `handle not erc20`() = runBlocking { + orderActivityEventHandler.handle( + EthActivityEventDto( + activity = OrderActivityMatchDto( + id = "id", + blockHash = Word.apply(randomWord()), + blockNumber = 1, + date = nowMillis(), + logIndex = 1, + price = BigDecimal.ONE, + transactionHash = Word.apply(randomWord()), + source = OrderActivityDto.Source.RARIBLE, + type = OrderActivityMatchDto.Type.ACCEPT_BID, + left = OrderActivityMatchSideDto( + hash = Word.apply(randomWord()), + maker = Address.ONE(), + asset = AssetDto( + assetType = EthAssetTypeDto(), + value = BigInteger.ONE, + ) + ), + right = OrderActivityMatchSideDto( + hash = Word.apply(randomWord()), + maker = Address.THREE(), + asset = AssetDto( + assetType = Erc721AssetTypeDto( + contract = Address.FOUR(), + tokenId = BigInteger.ONE, + ), + value = BigInteger.ONE, + ) + ) + + ), + eventTimeMarks = EventTimeMarksDto("source", marks = emptyList()) + ) + ) + + verifyNoInteractions(erc20AllowanceService) + } + + @Test + fun `handle switch left right`() = runBlocking { + orderActivityEventHandler.handle( + EthActivityEventDto( + activity = OrderActivityMatchDto( + id = "id", + blockHash = Word.apply(randomWord()), + blockNumber = 1, + date = nowMillis(), + logIndex = 1, + price = BigDecimal.ONE, + transactionHash = Word.apply(randomWord()), + source = OrderActivityDto.Source.RARIBLE, + type = OrderActivityMatchDto.Type.ACCEPT_BID, + right = OrderActivityMatchSideDto( + hash = Word.apply(randomWord()), + maker = Address.THREE(), + asset = AssetDto( + assetType = Erc20AssetTypeDto( + contract = Address.FOUR(), + ), + value = BigInteger.ONE, + ), + type = OrderActivityMatchSideDto.Type.BID, + ), + left = OrderActivityMatchSideDto( + hash = Word.apply(randomWord()), + maker = Address.ONE(), + asset = AssetDto( + assetType = Erc721AssetTypeDto( + contract = Address.TWO(), + tokenId = BigInteger.ONE, + ), + value = BigInteger.ONE, + ), + type = OrderActivityMatchSideDto.Type.SELL, + ) + + ), + eventTimeMarks = EventTimeMarksDto("source", marks = emptyList()) + ) + ) + + verify(erc20AllowanceService).onChainUpdate( + balanceId = eq( + BalanceId( + owner = Address.THREE(), + token = Address.FOUR() + ) + ), + eventTimeMarks = argThat { + source == "source" && marks.size == 1 && marks[0].name == "indexer-in_erc20" + }, + event = isNull(), + ) + } +} diff --git a/erc20/listener/src/test/resources/config/application-integration.yml b/erc20/listener/src/test/resources/config/application-integration.yml index 0c0632aea..a7d219426 100644 --- a/erc20/listener/src/test/resources/config/application-integration.yml +++ b/erc20/listener/src/test/resources/config/application-integration.yml @@ -9,6 +9,9 @@ protocol: erc20: subscriber: broker-replica-set: ${kafka.hosts} + order: + subscriber: + broker-replica-set: ${kafka.hosts} common: kafka-replica-set: ${kafka.hosts} diff --git a/nft/core/src/test/kotlin/com/rarible/protocol/nft/core/service/item/meta/ArtBlocksPropertiesResolverTest.kt b/nft/core/src/test/kotlin/com/rarible/protocol/nft/core/service/item/meta/ArtBlocksPropertiesResolverTest.kt index c0030a7d4..10acaa106 100644 --- a/nft/core/src/test/kotlin/com/rarible/protocol/nft/core/service/item/meta/ArtBlocksPropertiesResolverTest.kt +++ b/nft/core/src/test/kotlin/com/rarible/protocol/nft/core/service/item/meta/ArtBlocksPropertiesResolverTest.kt @@ -9,7 +9,6 @@ import com.rarible.protocol.nft.core.service.item.meta.descriptors.ArtBlocksProp import com.rarible.protocol.nft.core.service.item.meta.properties.ContentBuilder import kotlinx.coroutines.runBlocking import org.assertj.core.api.Assertions.assertThat -import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import scalether.domain.Address import java.math.BigInteger diff --git a/order/core/src/main/kotlin/com/rarible/protocol/order/core/service/block/order/OrderActivitySubscriber.kt b/order/core/src/main/kotlin/com/rarible/protocol/order/core/service/block/order/OrderActivitySubscriber.kt index cc8cb2353..cc71faa67 100644 --- a/order/core/src/main/kotlin/com/rarible/protocol/order/core/service/block/order/OrderActivitySubscriber.kt +++ b/order/core/src/main/kotlin/com/rarible/protocol/order/core/service/block/order/OrderActivitySubscriber.kt @@ -36,7 +36,7 @@ class OrderActivitySubscriber( logger.info("Order log event: id=${logRecord.id}, dataType=$dataType, reverted=${event.reverted}") convert(logRecord, event.reverted)?.let { - eventPublisher.publish(it, event.eventTimeMarks.addIndexerIn(indexerInMark)) + eventPublisher.publish(it, eventTimeMarks) } } } diff --git a/pom.xml b/pom.xml index 205faf0ed..f9549016c 100644 --- a/pom.xml +++ b/pom.xml @@ -116,6 +116,12 @@ 2.0.1 + + org.mockito.kotlin + mockito-kotlin + 4.1.0 + + @@ -138,6 +144,11 @@ org.jetbrains.kotlin kotlin-reflect + + org.mockito.kotlin + mockito-kotlin + test +