Skip to content

Commit

Permalink
feat: add opensea event integration
Browse files Browse the repository at this point in the history
  • Loading branch information
mystdeim committed Aug 3, 2023
1 parent 9ba2afe commit fe04eda
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 2 deletions.
4 changes: 4 additions & 0 deletions order/listener/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@
<groupId>com.rarible</groupId>
<artifactId>rarible-opensea-client-starter</artifactId>
</dependency>
<dependency>
<groupId>com.rarible</groupId>
<artifactId>rarible-opensea-subscriber</artifactId>
</dependency>
<dependency>
<groupId>com.rarible</groupId>
<artifactId>rarible-reservoir-client-starter</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import com.rarible.core.task.EnableRaribleTask
import com.rarible.ethereum.contract.EnableContractService
import com.rarible.ethereum.converters.EnableScaletherMongoConversions
import com.rarible.ethereum.domain.Blockchain
import com.rarible.opensea.subscriber.OpenseaConsumerFactory
import com.rarible.opensea.subscriber.model.OpenseaEvent
import com.rarible.protocol.dto.Erc20BalanceEventDto
import com.rarible.protocol.dto.NftItemEventDto
import com.rarible.protocol.dto.NftOwnershipEventDto
Expand Down Expand Up @@ -37,6 +39,7 @@ import com.rarible.protocol.order.listener.service.opensea.ExternalUserAgentProv
import com.rarible.protocol.order.listener.service.opensea.OpenSeaOrderConverter
import com.rarible.protocol.order.listener.service.opensea.OpenSeaOrderService
import com.rarible.protocol.order.listener.service.opensea.OpenSeaOrderValidator
import com.rarible.protocol.order.listener.service.opensea.OpenseaEventHandler
import com.rarible.protocol.order.listener.service.opensea.SeaportOrderLoadHandler
import com.rarible.protocol.order.listener.service.opensea.SeaportOrderLoader
import com.rarible.protocol.order.listener.service.order.OrderStartEndCheckerHandler
Expand Down Expand Up @@ -74,6 +77,8 @@ class OrderListenerConfiguration(
"protocol.${commonProperties.blockchain.value}.order.indexer.ownership"
private val itemConsumerGroup =
"protocol.${commonProperties.blockchain.value}.order.indexer.item"
private val openseaEventConsumerGroup =
"protocol.${commonProperties.blockchain.value}.order.indexer.opensea.event"

@Bean
fun blockchain(): Blockchain {
Expand Down Expand Up @@ -128,6 +133,11 @@ class OrderListenerConfiguration(
return listenerProperties.reservoir
}

@Bean
fun openseaEventProperties(): OpenseaEventProperties {
return listenerProperties.openseaEventProperties
}

@Bean
fun erc20BalanceChangeWorker(
factory: RaribleKafkaConsumerFactory,
Expand All @@ -145,6 +155,28 @@ class OrderListenerConfiguration(
)
}

@Bean
fun openseaEventWorker(
factory: RaribleKafkaConsumerFactory,
handler: OpenseaEventHandler
): RaribleKafkaConsumerWorker<OpenseaEvent> {
val openseaConsumerFactory = OpenseaConsumerFactory(
brokerReplicaSet = commonProperties.kafkaReplicaSet,
host = environmentInfo.host,
environment = environmentInfo.name
)
val settings = openseaConsumerFactory.createEventConsumerSettings(
group = openseaEventConsumerGroup,
concurrency = listenerProperties.openseaEventProperties.workerCount,
blockchain = commonProperties.blockchain.value,
batchSize = listenerProperties.openseaEventProperties.saveBatchSize
)
return factory.createWorker(
settings = settings,
handler = handler
)
}

@Bean
fun ownershipChangeWorker(
factory: RaribleKafkaConsumerFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ data class OrderListenerProperties(
val eventConsumerWorker: DaemonWorkerProperties = DaemonWorkerProperties(),
val logConsumeWorkerCount: Int = 9,
val logConsumeWorkerBatchSize: Int = 500,
val floorPriceTopCollectionsCount: Int = 10
val floorPriceTopCollectionsCount: Int = 10,
val openseaEventProperties: OpenseaEventProperties = OpenseaEventProperties()
) {
enum class OrderSide {
ALL,
Expand Down Expand Up @@ -163,3 +164,9 @@ data class ReservoirProperties(
val pollingPeriod: Duration = Duration.ofSeconds(60),
val errorDelay: Duration = Duration.ofSeconds(1)
)

data class OpenseaEventProperties(
val cancelEnabled: Boolean = true,
val saveBatchSize: Int = 50,
val workerCount: Int = 1
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.rarible.protocol.order.listener.service.opensea

import com.rarible.core.common.EventTimeMark
import com.rarible.core.common.EventTimeMarks
import com.rarible.core.kafka.RaribleKafkaEventHandler
import com.rarible.core.logging.addToMdc
import com.rarible.opensea.subscriber.model.OpenseaEvent
import com.rarible.opensea.subscriber.model.OpenseaItemCancelled
import com.rarible.protocol.order.core.misc.orderOffchainEventMarks
import com.rarible.protocol.order.core.model.OrderStatus
import com.rarible.protocol.order.core.repository.order.OrderRepository
import com.rarible.protocol.order.core.service.OrderCancelService
import com.rarible.protocol.order.listener.configuration.OpenseaEventProperties
import com.rarible.protocol.order.listener.misc.ForeignOrderMetrics
import io.daonomic.rpc.domain.Word
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component


@Component
class OpenseaEventHandler(
private val orderCancelService: OrderCancelService,
private val properties: OpenseaEventProperties,
private val orderRepository: OrderRepository,
private val foreignOrderMetrics: ForeignOrderMetrics,
) : RaribleKafkaEventHandler<OpenseaEvent> {

private val logger: Logger = LoggerFactory.getLogger(javaClass)

override suspend fun handle(event: OpenseaEvent) {
val timeMarks = event.eventTimeMarks?.let { marks ->
EventTimeMarks(
source = marks.source,
marks = marks.marks.map { EventTimeMark(it.name, it.date) }
)
} ?: orderOffchainEventMarks()
when (val payload = event.payload) {
is OpenseaItemCancelled -> handleCancelEvent(payload, timeMarks)
}
}

private suspend fun handleCancelEvent(event: OpenseaItemCancelled, eventTimeMarks: EventTimeMarks) {
val id = Word.apply(event.orderHash)
val order = orderRepository.findById(id) ?: return
if (order.status == OrderStatus.ACTIVE) {
logger.warn(
"Found canceled order but active in the database ${order.type}: $id. Will cancel: ${properties.cancelEnabled}"
)
foreignOrderMetrics.onOrderInconsistency(platform = order.platform, status = order.status.name)
if (properties.cancelEnabled) {
addToMdc("orderType" to order.type.name) {
logger.info("Unexpected order cancellation: ${order.type}:${order.hash}")
}
orderCancelService.cancelOrder(
id = id,
eventTimeMarksDto = eventTimeMarks,
)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.rarible.protocol.order.listener.service.opensea

import com.rarible.core.kafka.KafkaMessage
import com.rarible.core.kafka.RaribleKafkaProducer
import com.rarible.core.kafka.json.JsonSerializer
import com.rarible.core.test.wait.Wait
import com.rarible.opensea.subscriber.OpenseaTopicProvider
import com.rarible.opensea.subscriber.model.OpenseaEvent
import com.rarible.opensea.subscriber.model.OpenseaEventType
import com.rarible.opensea.subscriber.model.OpenseaItemCancelled
import com.rarible.protocol.order.core.model.OrderStatus
import com.rarible.protocol.order.listener.data.createOrder
import com.rarible.protocol.order.listener.integration.AbstractIntegrationTest
import com.rarible.protocol.order.listener.integration.IntegrationTest
import kotlinx.coroutines.reactor.awaitSingle
import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import java.time.Instant
import java.util.UUID

@IntegrationTest
class OpenseaEventHandlerTest : AbstractIntegrationTest() {

@Test
fun `check order cancellation`() = runBlocking {

val order = createOrder().copy(status = OrderStatus.ACTIVE)
orderVersionRepository.save(order.toOrderVersion()).awaitSingle()
orderRepository.save(order)

val producer = RaribleKafkaProducer(
clientId = UUID.randomUUID().toString(),
valueSerializerClass = JsonSerializer::class.java,
valueClass = OpenseaEvent::class.java,
defaultTopic = OpenseaTopicProvider.getEventTopic("e2e", "ethereum"),
bootstrapServers = orderIndexerProperties.kafkaReplicaSet
)
producer.send(
KafkaMessage(
key = "",
value = OpenseaEvent(
eventId = "",
event = OpenseaEventType.ITEM_CANCELLED,
payload = OpenseaItemCancelled(
orderHash = order.id.hash.prefixed(),
eventTimestamp = Instant.now(),
maker = null
)
),
headers = mapOf(),
id = ""
)
).ensureSuccess()

Wait.waitAssert {
val updatedOrder = orderRepository.findById(order.hash)
assertThat(updatedOrder?.status).isEqualTo(OrderStatus.CANCELLED)
}
}
}
5 changes: 5 additions & 0 deletions order/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@
<artifactId>rarible-opensea-client-starter</artifactId>
<version>${opensea-client-starter.version}</version>
</dependency>
<dependency>
<groupId>com.rarible</groupId>
<artifactId>rarible-opensea-subscriber</artifactId>
<version>${opensea-client-starter.version}</version>
</dependency>
<dependency>
<groupId>com.rarible.x2y2</groupId>
<artifactId>rarible-x2y2-client-starter</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

<rarible.protocol.ethereum.version>1.56.0</rarible.protocol.ethereum.version>
<rarible.protocol.currency.version>1.38.4</rarible.protocol.currency.version>
<opensea-client-starter.version>2.1.17</opensea-client-starter.version>
<opensea-client-starter.version>2.1.18</opensea-client-starter.version>
<looksrare-client-starter.version>1.2.6</looksrare-client-starter.version>
<x2y2-client.version>1.1.1</x2y2-client.version>
<reservoir-client.version>1.0.8</reservoir-client.version>
Expand Down

0 comments on commit fe04eda

Please sign in to comment.