Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(PT-3255): update approval on bid execution #714

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.rarible.protocol.erc20.core.service

import com.rarible.contracts.erc20.IERC20
import com.rarible.core.common.EventTimeMarks
import com.rarible.core.common.nowMillis
import com.rarible.core.common.optimisticLock
import com.rarible.core.entity.reducer.service.EntityService
Expand All @@ -10,6 +11,7 @@ import com.rarible.protocol.erc20.core.event.Erc20BalanceEventListener
import com.rarible.protocol.erc20.core.model.BalanceId
import com.rarible.protocol.erc20.core.model.Erc20Allowance
import com.rarible.protocol.erc20.core.model.Erc20AllowanceEvent
import com.rarible.protocol.erc20.core.model.Erc20Event
import com.rarible.protocol.erc20.core.model.Erc20MarkedEvent
import com.rarible.protocol.erc20.core.repository.Erc20AllowanceRepository
import kotlinx.coroutines.reactor.awaitSingleOrNull
Expand Down Expand Up @@ -41,14 +43,21 @@ class Erc20AllowanceService(
)
}

override suspend fun update(entity: Erc20Allowance, event: Erc20MarkedEvent?): Erc20Allowance {
override suspend fun update(entity: Erc20Allowance, event: Erc20MarkedEvent?): Erc20Allowance =
update(entity = entity, event = event?.event, eventTimeMarks = event?.eventTimeMarks)

private suspend fun update(
entity: Erc20Allowance,
eventTimeMarks: EventTimeMarks?,
event: Erc20Event?
): Erc20Allowance {
val result = if (saveToDb) {
erc20AllowanceRepository.save(entity)
} else {
entity
}
erc20BalanceEventListeners.forEach {
it.onUpdate(Erc20AllowanceEvent(event?.event, event?.eventTimeMarks, entity))
it.onUpdate(Erc20AllowanceEvent(event, eventTimeMarks, entity))
}
return result
}
Expand All @@ -58,7 +67,11 @@ class Erc20AllowanceService(
?.let { EthUInt256(it) }
}

suspend fun onChainUpdate(balanceId: BalanceId, event: Erc20MarkedEvent?) {
suspend fun onChainUpdate(
balanceId: BalanceId,
eventTimeMarks: EventTimeMarks?,
event: Erc20Event?
) {
optimisticLock {
val erc20Allowance = get(balanceId) ?: Erc20Allowance(
token = balanceId.token,
Expand All @@ -71,7 +84,7 @@ class Erc20AllowanceService(
logger.error("Can't get allowance $balanceId from blockchain")
throw IllegalStateException("Can't get allowance $balanceId from blockchain")
}
update(entity = erc20Allowance.withAllowance(allowance), event = event)
update(entity = erc20Allowance.withAllowance(allowance), event = event, eventTimeMarks = eventTimeMarks)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ class Erc20EventChainUpdateService(
}
val approvalEvent = erc20Events.lastOrNull { it?.event is Erc20Event.Erc20TokenApprovalEvent }
if (approvalEvent != null) {
erc20AllowanceService.onChainUpdate(balanceId = balanceId, event = approvalEvent)
erc20AllowanceService.onChainUpdate(
balanceId = balanceId,
event = approvalEvent.event,
eventTimeMarks = approvalEvent.eventTimeMarks
)
}
val balance = erc20BalanceService.onChainUpdate(balanceId, erc20Events.last())
if (balance == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ internal class Erc20AllowanceServiceTest {

coEvery { erc20BalanceEventListener.onUpdate(any()) } returns Unit

erc20AllowanceService.onChainUpdate(balanceId, null)
erc20AllowanceService.onChainUpdate(balanceId = balanceId, event = null, eventTimeMarks = null)

coVerify {
erc20BalanceEventListener.onUpdate(withArg {
Expand Down Expand Up @@ -141,7 +141,7 @@ internal class Erc20AllowanceServiceTest {

coEvery { erc20BalanceEventListener.onUpdate(any()) } returns Unit

erc20AllowanceService.onChainUpdate(balanceId, null)
erc20AllowanceService.onChainUpdate(balanceId = balanceId, event = null, eventTimeMarks = null)

coVerify {
erc20BalanceEventListener.onUpdate(withArg {
Expand Down Expand Up @@ -179,7 +179,7 @@ internal class Erc20AllowanceServiceTest {

assertThatExceptionOfType(IllegalStateException::class.java).isThrownBy {
runBlocking {
erc20AllowanceService.onChainUpdate(balanceId, null)
erc20AllowanceService.onChainUpdate(balanceId = balanceId, event = null, eventTimeMarks = null)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ class Erc20EventChainUpdateServiceTest {
).map { LogRecordEvent(it, false, EventTimeMarks("test", emptyList())) }

coEvery { balanceService.onChainUpdate(balanceId, any()) } returns mockk()
coEvery { allowanceService.onChainUpdate(balanceId, any()) } returns Unit
coEvery { allowanceService.onChainUpdate(balanceId, any(), any()) } returns Unit

service.onEntityEvents(events)

coVerify { balanceService.onChainUpdate(balanceId, any()) }
coVerify { allowanceService.onChainUpdate(balanceId, any()) }
coVerify { allowanceService.onChainUpdate(balanceId, any(), any()) }
coVerify(exactly = 0) { delegate.onEntityEvents(any()) }
}
}
4 changes: 4 additions & 0 deletions erc20/listener/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
<groupId>com.rarible.protocol</groupId>
<artifactId>protocol-erc20-indexer-core</artifactId>
</dependency>
<dependency>
<groupId>com.rarible.protocol.ethereum</groupId>
<artifactId>protocol-subscriber-order-starter</artifactId>
</dependency>
<dependency>
<groupId>com.rarible.protocol.ethereum</groupId>
<artifactId>protocol-subscriber-erc20-starter</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,25 @@ package com.rarible.protocol.erc20.listener.configuration
import com.github.cloudyrock.spring.v5.EnableMongock
import com.rarible.core.application.ApplicationEnvironmentInfo
import com.rarible.core.kafka.RaribleKafkaConsumerFactory
import com.rarible.core.kafka.RaribleKafkaConsumerSettings
import com.rarible.core.kafka.RaribleKafkaConsumerWorker
import com.rarible.ethereum.contract.EnableContractService
import com.rarible.ethereum.converters.EnableScaletherMongoConversions
import com.rarible.ethereum.domain.Blockchain
import com.rarible.protocol.dto.ActivityTopicProvider
import com.rarible.protocol.dto.Erc20BalanceEventDto
import com.rarible.protocol.dto.EthActivityEventDto
import com.rarible.protocol.erc20.api.subscriber.Erc20IndexerEventsConsumerFactory
import com.rarible.protocol.erc20.core.configuration.ProducerConfiguration
import com.rarible.protocol.erc20.core.event.Erc20EventPublisher
import com.rarible.protocol.erc20.core.event.KafkaErc20BalanceEventListener
import com.rarible.protocol.erc20.core.metric.CheckerMetrics
import com.rarible.protocol.erc20.core.metric.DescriptorMetrics
import com.rarible.protocol.erc20.listener.listener.OrderActivityEventHandler
import com.rarible.protocol.erc20.listener.scanner.BalanceBatchCheckerHandler
import com.rarible.protocol.order.api.subscriber.autoconfigure.OrderIndexerEventsSubscriberProperties
import io.micrometer.core.instrument.MeterRegistry
import org.apache.kafka.clients.consumer.OffsetResetStrategy
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
Expand All @@ -31,12 +37,16 @@ import scalether.core.MonoEthereum
class Erc20ListenerConfiguration(
private val environmentInfo: ApplicationEnvironmentInfo,
private val commonProperties: Erc20ListenerProperties,
private val erc20IndexerEventsConsumerFactory: Erc20IndexerEventsConsumerFactory
private val erc20IndexerEventsConsumerFactory: Erc20IndexerEventsConsumerFactory,
private val orderIndexerEventsSubscriberProperties: OrderIndexerEventsSubscriberProperties,
) {

private val erc20BalanceConsumerGroup =
"protocol.${commonProperties.blockchain.value}.erc20.indexer.erc20-balance"

private val erc20OrderActivityConsumerGroup =
"protocol.${commonProperties.blockchain.value}.erc20.indexer.order-activity"

@Bean
fun raribleKafkaConsumerFactory(): RaribleKafkaConsumerFactory {
return RaribleKafkaConsumerFactory(
Expand Down Expand Up @@ -88,4 +98,29 @@ class Erc20ListenerConfiguration(
handler = BalanceBatchCheckerHandler(ethereum, checkerMetrics, commonProperties)
)
}

@Bean
fun orderActivityWorker(
checkerMetrics: CheckerMetrics,
raribleKafkaConsumerFactory: RaribleKafkaConsumerFactory,
orderActivityEventHandler: OrderActivityEventHandler,
): RaribleKafkaConsumerWorker<EthActivityEventDto> {
val settings = RaribleKafkaConsumerSettings(
hosts = orderIndexerEventsSubscriberProperties.brokerReplicaSet,
topic = ActivityTopicProvider.getActivityTopic(
environment = environmentInfo.name,
blockchain = blockchain().value
),
group = erc20OrderActivityConsumerGroup,
concurrency = commonProperties.orderActivityProperties.eventsHandleConcurrency,
batchSize = commonProperties.orderActivityProperties.eventsHandleBatchSize,
async = false,
offsetResetStrategy = OffsetResetStrategy.LATEST,
valueClass = EthActivityEventDto::class.java,
)
return raribleKafkaConsumerFactory.createWorker(
settings = settings,
handler = orderActivityEventHandler,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@ data class Erc20ListenerProperties(
val eventConsumerWorker: DaemonWorkerProperties = DaemonWorkerProperties(),
val skipTransferContracts: List<String> = emptyList(),
val balanceCheckerProperties: BalanceCheckerProperties = BalanceCheckerProperties(),
val orderActivityProperties: OrderActivityProperties = OrderActivityProperties(),
val job: Erc20JobProperties = Erc20JobProperties()
)

data class OrderActivityProperties(
val eventsHandleBatchSize: Int = 200,
val eventsHandleConcurrency: Int = 9,
)

data class BalanceCheckerProperties(
val skipNumberOfBlocks: Long = 20,
val confirms: Int = 2,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.rarible.protocol.erc20.listener.listener

import com.rarible.core.kafka.RaribleKafkaEventHandler
import com.rarible.protocol.dto.Erc20AssetTypeDto
import com.rarible.protocol.dto.EthActivityEventDto
import com.rarible.protocol.dto.OrderActivityMatchDto
import com.rarible.protocol.dto.OrderActivityMatchSideDto
import com.rarible.protocol.dto.toModel
import com.rarible.protocol.erc20.core.misc.addIndexerIn
import com.rarible.protocol.erc20.core.model.BalanceId
import com.rarible.protocol.erc20.core.service.Erc20AllowanceService
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component

@Component
class OrderActivityEventHandler(
private val erc20AllowanceService: Erc20AllowanceService,
) : RaribleKafkaEventHandler<EthActivityEventDto> {

override suspend fun handle(event: EthActivityEventDto) {
logger.info("Handle event: $event")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is no sense to track all events, only those we are interested in

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we track if something is missed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reduced amount of logs

val activity = event.activity
if (activity is OrderActivityMatchDto && activity.type == OrderActivityMatchDto.Type.ACCEPT_BID) {
val sideDto = if (activity.left.type == OrderActivityMatchSideDto.Type.BID) {
activity.left
} else {
activity.right
}
val assetType = sideDto.asset.assetType
if (assetType is Erc20AssetTypeDto) {
logger.info("Will recalculate erc20 balance")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Provide more info in logs - which balance, from what order etc

val owner = sideDto.maker
val token = assetType.contract
erc20AllowanceService.onChainUpdate(
balanceId = BalanceId(token = token, owner = owner),
eventTimeMarks = event.eventTimeMarks.toModel().addIndexerIn(),
event = null,
)
}
}
}

companion object {
private val logger = LoggerFactory.getLogger(OrderActivityEventHandler::class.java)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,9 @@ protocol:
erc20:
subscriber:
broker-replica-set: ${kafka.hosts}
order:
subscriber:
broker-replica-set: ${kafka.hosts}

common:
kafka-replica-set: ${kafka.hosts}
Loading
Loading