Skip to content

Commit

Permalink
feat(PT-3255): update approval on bid execution (#714)
Browse files Browse the repository at this point in the history
* feat(PT-3255): update approval on bid execution

* feat(PT-3255): update approval on bid execution

* feat(PT-3255): update approval on bid execution
  • Loading branch information
dmitry-saraykin authored Aug 1, 2023
1 parent 3e91402 commit ebb7b95
Show file tree
Hide file tree
Showing 13 changed files with 420 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.rarible.protocol.erc20.core.service

import com.rarible.contracts.erc20.IERC20
import com.rarible.core.common.EventTimeMarks
import com.rarible.core.common.nowMillis
import com.rarible.core.common.optimisticLock
import com.rarible.core.entity.reducer.service.EntityService
Expand All @@ -10,6 +11,7 @@ import com.rarible.protocol.erc20.core.event.Erc20BalanceEventListener
import com.rarible.protocol.erc20.core.model.BalanceId
import com.rarible.protocol.erc20.core.model.Erc20Allowance
import com.rarible.protocol.erc20.core.model.Erc20AllowanceEvent
import com.rarible.protocol.erc20.core.model.Erc20Event
import com.rarible.protocol.erc20.core.model.Erc20MarkedEvent
import com.rarible.protocol.erc20.core.repository.Erc20AllowanceRepository
import kotlinx.coroutines.reactor.awaitSingleOrNull
Expand Down Expand Up @@ -41,14 +43,21 @@ class Erc20AllowanceService(
)
}

override suspend fun update(entity: Erc20Allowance, event: Erc20MarkedEvent?): Erc20Allowance {
override suspend fun update(entity: Erc20Allowance, event: Erc20MarkedEvent?): Erc20Allowance =
update(entity = entity, event = event?.event, eventTimeMarks = event?.eventTimeMarks)

private suspend fun update(
entity: Erc20Allowance,
eventTimeMarks: EventTimeMarks?,
event: Erc20Event?
): Erc20Allowance {
val result = if (saveToDb) {
erc20AllowanceRepository.save(entity)
} else {
entity
}
erc20BalanceEventListeners.forEach {
it.onUpdate(Erc20AllowanceEvent(event?.event, event?.eventTimeMarks, entity))
it.onUpdate(Erc20AllowanceEvent(event, eventTimeMarks, entity))
}
return result
}
Expand All @@ -58,7 +67,11 @@ class Erc20AllowanceService(
?.let { EthUInt256(it) }
}

suspend fun onChainUpdate(balanceId: BalanceId, event: Erc20MarkedEvent?) {
suspend fun onChainUpdate(
balanceId: BalanceId,
eventTimeMarks: EventTimeMarks?,
event: Erc20Event?
) {
optimisticLock {
val erc20Allowance = get(balanceId) ?: Erc20Allowance(
token = balanceId.token,
Expand All @@ -71,7 +84,7 @@ class Erc20AllowanceService(
logger.error("Can't get allowance $balanceId from blockchain")
throw IllegalStateException("Can't get allowance $balanceId from blockchain")
}
update(entity = erc20Allowance.withAllowance(allowance), event = event)
update(entity = erc20Allowance.withAllowance(allowance), event = event, eventTimeMarks = eventTimeMarks)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ class Erc20EventChainUpdateService(
}
val approvalEvent = erc20Events.lastOrNull { it?.event is Erc20Event.Erc20TokenApprovalEvent }
if (approvalEvent != null) {
erc20AllowanceService.onChainUpdate(balanceId = balanceId, event = approvalEvent)
erc20AllowanceService.onChainUpdate(
balanceId = balanceId,
event = approvalEvent.event,
eventTimeMarks = approvalEvent.eventTimeMarks
)
}
val balance = erc20BalanceService.onChainUpdate(balanceId, erc20Events.last())
if (balance == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ internal class Erc20AllowanceServiceTest {

coEvery { erc20BalanceEventListener.onUpdate(any()) } returns Unit

erc20AllowanceService.onChainUpdate(balanceId, null)
erc20AllowanceService.onChainUpdate(balanceId = balanceId, event = null, eventTimeMarks = null)

coVerify {
erc20BalanceEventListener.onUpdate(withArg {
Expand Down Expand Up @@ -141,7 +141,7 @@ internal class Erc20AllowanceServiceTest {

coEvery { erc20BalanceEventListener.onUpdate(any()) } returns Unit

erc20AllowanceService.onChainUpdate(balanceId, null)
erc20AllowanceService.onChainUpdate(balanceId = balanceId, event = null, eventTimeMarks = null)

coVerify {
erc20BalanceEventListener.onUpdate(withArg {
Expand Down Expand Up @@ -179,7 +179,7 @@ internal class Erc20AllowanceServiceTest {

assertThatExceptionOfType(IllegalStateException::class.java).isThrownBy {
runBlocking {
erc20AllowanceService.onChainUpdate(balanceId, null)
erc20AllowanceService.onChainUpdate(balanceId = balanceId, event = null, eventTimeMarks = null)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ class Erc20EventChainUpdateServiceTest {
).map { LogRecordEvent(it, false, EventTimeMarks("test", emptyList())) }

coEvery { balanceService.onChainUpdate(balanceId, any()) } returns mockk()
coEvery { allowanceService.onChainUpdate(balanceId, any()) } returns Unit
coEvery { allowanceService.onChainUpdate(balanceId, any(), any()) } returns Unit

service.onEntityEvents(events)

coVerify { balanceService.onChainUpdate(balanceId, any()) }
coVerify { allowanceService.onChainUpdate(balanceId, any()) }
coVerify { allowanceService.onChainUpdate(balanceId, any(), any()) }
coVerify(exactly = 0) { delegate.onEntityEvents(any()) }
}
}
4 changes: 4 additions & 0 deletions erc20/listener/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
<groupId>com.rarible.protocol</groupId>
<artifactId>protocol-erc20-indexer-core</artifactId>
</dependency>
<dependency>
<groupId>com.rarible.protocol.ethereum</groupId>
<artifactId>protocol-subscriber-order-starter</artifactId>
</dependency>
<dependency>
<groupId>com.rarible.protocol.ethereum</groupId>
<artifactId>protocol-subscriber-erc20-starter</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,25 @@ package com.rarible.protocol.erc20.listener.configuration
import com.github.cloudyrock.spring.v5.EnableMongock
import com.rarible.core.application.ApplicationEnvironmentInfo
import com.rarible.core.kafka.RaribleKafkaConsumerFactory
import com.rarible.core.kafka.RaribleKafkaConsumerSettings
import com.rarible.core.kafka.RaribleKafkaConsumerWorker
import com.rarible.ethereum.contract.EnableContractService
import com.rarible.ethereum.converters.EnableScaletherMongoConversions
import com.rarible.ethereum.domain.Blockchain
import com.rarible.protocol.dto.ActivityTopicProvider
import com.rarible.protocol.dto.Erc20BalanceEventDto
import com.rarible.protocol.dto.EthActivityEventDto
import com.rarible.protocol.erc20.api.subscriber.Erc20IndexerEventsConsumerFactory
import com.rarible.protocol.erc20.core.configuration.ProducerConfiguration
import com.rarible.protocol.erc20.core.event.Erc20EventPublisher
import com.rarible.protocol.erc20.core.event.KafkaErc20BalanceEventListener
import com.rarible.protocol.erc20.core.metric.CheckerMetrics
import com.rarible.protocol.erc20.core.metric.DescriptorMetrics
import com.rarible.protocol.erc20.listener.listener.OrderActivityEventHandler
import com.rarible.protocol.erc20.listener.scanner.BalanceBatchCheckerHandler
import com.rarible.protocol.order.api.subscriber.autoconfigure.OrderIndexerEventsSubscriberProperties
import io.micrometer.core.instrument.MeterRegistry
import org.apache.kafka.clients.consumer.OffsetResetStrategy
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
Expand All @@ -31,12 +37,16 @@ import scalether.core.MonoEthereum
class Erc20ListenerConfiguration(
private val environmentInfo: ApplicationEnvironmentInfo,
private val commonProperties: Erc20ListenerProperties,
private val erc20IndexerEventsConsumerFactory: Erc20IndexerEventsConsumerFactory
private val erc20IndexerEventsConsumerFactory: Erc20IndexerEventsConsumerFactory,
private val orderIndexerEventsSubscriberProperties: OrderIndexerEventsSubscriberProperties,
) {

private val erc20BalanceConsumerGroup =
"protocol.${commonProperties.blockchain.value}.erc20.indexer.erc20-balance"

private val erc20OrderActivityConsumerGroup =
"protocol.${commonProperties.blockchain.value}.erc20.indexer.order-activity"

@Bean
fun raribleKafkaConsumerFactory(): RaribleKafkaConsumerFactory {
return RaribleKafkaConsumerFactory(
Expand Down Expand Up @@ -88,4 +98,29 @@ class Erc20ListenerConfiguration(
handler = BalanceBatchCheckerHandler(ethereum, checkerMetrics, commonProperties)
)
}

@Bean
fun orderActivityWorker(
checkerMetrics: CheckerMetrics,
raribleKafkaConsumerFactory: RaribleKafkaConsumerFactory,
orderActivityEventHandler: OrderActivityEventHandler,
): RaribleKafkaConsumerWorker<EthActivityEventDto> {
val settings = RaribleKafkaConsumerSettings(
hosts = orderIndexerEventsSubscriberProperties.brokerReplicaSet,
topic = ActivityTopicProvider.getActivityTopic(
environment = environmentInfo.name,
blockchain = blockchain().value
),
group = erc20OrderActivityConsumerGroup,
concurrency = commonProperties.orderActivityProperties.eventsHandleConcurrency,
batchSize = commonProperties.orderActivityProperties.eventsHandleBatchSize,
async = false,
offsetResetStrategy = OffsetResetStrategy.LATEST,
valueClass = EthActivityEventDto::class.java,
)
return raribleKafkaConsumerFactory.createWorker(
settings = settings,
handler = orderActivityEventHandler,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@ data class Erc20ListenerProperties(
val eventConsumerWorker: DaemonWorkerProperties = DaemonWorkerProperties(),
val skipTransferContracts: List<String> = emptyList(),
val balanceCheckerProperties: BalanceCheckerProperties = BalanceCheckerProperties(),
val orderActivityProperties: OrderActivityProperties = OrderActivityProperties(),
val job: Erc20JobProperties = Erc20JobProperties()
)

data class OrderActivityProperties(
val eventsHandleBatchSize: Int = 200,
val eventsHandleConcurrency: Int = 9,
)

data class BalanceCheckerProperties(
val skipNumberOfBlocks: Long = 20,
val confirms: Int = 2,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.rarible.protocol.erc20.listener.listener

import com.rarible.core.kafka.RaribleKafkaEventHandler
import com.rarible.protocol.dto.Erc20AssetTypeDto
import com.rarible.protocol.dto.EthActivityEventDto
import com.rarible.protocol.dto.OrderActivityMatchDto
import com.rarible.protocol.dto.OrderActivityMatchSideDto
import com.rarible.protocol.dto.toModel
import com.rarible.protocol.erc20.core.misc.addIndexerIn
import com.rarible.protocol.erc20.core.model.BalanceId
import com.rarible.protocol.erc20.core.service.Erc20AllowanceService
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component

@Component
class OrderActivityEventHandler(
private val erc20AllowanceService: Erc20AllowanceService,
) : RaribleKafkaEventHandler<EthActivityEventDto> {

override suspend fun handle(event: EthActivityEventDto) {
val activity = event.activity
if (activity !is OrderActivityMatchDto) {
return
}
logger.info("Handle event: $event")
if (activity.type == OrderActivityMatchDto.Type.ACCEPT_BID) {
handleActivity(activity, event)
}
}

private suspend fun handleActivity(
activity: OrderActivityMatchDto,
event: EthActivityEventDto
) {
val sideDto = if (activity.left.type == OrderActivityMatchSideDto.Type.BID) {
activity.left
} else {
activity.right
}
val assetType = sideDto.asset.assetType
if (assetType is Erc20AssetTypeDto) {
val owner = sideDto.maker
val token = assetType.contract
logger.info(
"Will recalculate erc20 balance owner=$owner, token=$token. " +
"After bid execution id=${sideDto.hash}"
)
erc20AllowanceService.onChainUpdate(
balanceId = BalanceId(token = token, owner = owner),
eventTimeMarks = event.eventTimeMarks.toModel().addIndexerIn(),
event = null,
)
}
}

companion object {
private val logger = LoggerFactory.getLogger(OrderActivityEventHandler::class.java)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,9 @@ protocol:
erc20:
subscriber:
broker-replica-set: ${kafka.hosts}
order:
subscriber:
broker-replica-set: ${kafka.hosts}

common:
kafka-replica-set: ${kafka.hosts}
Loading

0 comments on commit ebb7b95

Please sign in to comment.