Skip to content

Commit

Permalink
add records
Browse files Browse the repository at this point in the history
  • Loading branch information
ChaimaaeROUAI committed May 31, 2024
1 parent 62f0993 commit ef76982
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 145 deletions.
23 changes: 4 additions & 19 deletions src/events/app/src/main/java/events/model/Event.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,14 @@

import io.micronaut.serde.annotation.Serdeable;

import java.util.Collections;

import java.util.Map;
import java.util.Objects;


/**
* The event to track.
*/
@Serdeable
public class Event {

private final String type;
private final Map<String, String> detail;

public Event(String type, Map<String, String> detail) {
this.type = Objects.requireNonNull(type, "Type cannot be null");
this.detail = detail != null ? detail : Collections.emptyMap();
}

public String getType() {
return type;
}

public Map<String, String> getDetail() {
return detail;
}
public record Event(String type,Map<String, String> detail){
}

38 changes: 13 additions & 25 deletions src/events/app/src/main/java/events/model/EventRecord.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,27 @@
/**
* Event record.
*/
@Serdeable
public class EventRecord extends Event {
// import java.time.Instant;
// import java.util.Map;
// import java.util.Objects;

private final String source;
private final String track;
private final Instant time;
@Serdeable
public record EventRecord(String source, String track, Instant time, Event event) {

public EventRecord(String source,
String track,
Event event) {
this(source, track, Objects.requireNonNull(event, "Event cannot be null").getType(), event.getDetail());
public EventRecord(String source, String track, Event event) {
this(source, track, Instant.now(), Objects.requireNonNull(event, "Event cannot be null"));
}

@Creator
public EventRecord(String source,
String track,
String type,
Map<String, String> detail) {
super(type, detail);
this.source = source;
this.track = track;
time = Instant.now();
}

public String getSource() {
return source;
public EventRecord(String source, String track, String type, Map<String, String> detail) {
this(source, track, Instant.now(), new Event(type, detail));
}

public String getTrack() {
return track;
public String type() {
return event.type();
}

public Instant getTime() {
return time;
public Map<String, String> detail() {
return event.detail();
}
}
42 changes: 22 additions & 20 deletions src/events/app/src/main/java/events/model/EventsReceived.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,29 @@
* Defines whether events were received.
*/
@Serdeable
public class EventsReceived {
public record EventsReceived(boolean success ,int events){
}
// public class EventsReceived {

private final boolean success;
private final int events;
// private final boolean success;
// private final int events;

public EventsReceived(boolean success, int events) {
this.success = success;
this.events = events;
}
// public EventsReceived(boolean success, int events) {
// this.success = success;
// this.events = events;
// }

/**
* True if the events were successfully received
*/
public boolean isSuccess() {
return success;
}
// /**
// * True if the events were successfully received
// */
// public boolean isSuccess() {
// return success;
// }

/**
* The number of successfully received events
*/
public int getEvents() {
return events;
}
}
// /**
// * The number of successfully received events
// */
// public int getEvents() {
// return events;
// }
// }
38 changes: 19 additions & 19 deletions src/events/app/src/main/java/events/service/EventProducer.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package events.service;

import events.model.EventRecord;
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.Topic;

/**
* The messaging Kafka {@link EventRecord} producer.
*
* The annotation {@link KafkaClient} is handled by {@link io.micronaut.configuration.kafka.intercept.KafkaClientIntroductionAdvice}
* which introduces new bean that is able to handle publishing of {@link EventRecord}s to the Kafka.
*/
@KafkaClient(batch = true)
public interface EventProducer {
String EVENT_TOPIC_NAME = "events";

@Topic(EVENT_TOPIC_NAME)
void send(EventRecord... eventRecords);
}
package events.service;

import events.model.EventRecord;
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.Topic;

/**
* The messaging Kafka {@link EventRecord} producer.
*
* The annotation {@link KafkaClient} is handled by {@link io.micronaut.configuration.kafka.intercept.KafkaClientIntroductionAdvice}
* which introduces new bean that is able to handle publishing of {@link EventRecord}s to the Kafka.
*/
@KafkaClient(batch = true)
public interface EventProducer {
String EVENT_TOPIC_NAME = "events";

@Topic(EVENT_TOPIC_NAME)
void send(EventRecord...eventRecords);
}
94 changes: 47 additions & 47 deletions src/events/app/src/main/java/events/service/EventService.java
Original file line number Diff line number Diff line change
@@ -1,47 +1,47 @@
package events.service;

import events.model.Event;
import events.model.EventRecord;
import events.model.EventsReceived;
import io.micronaut.tracing.annotation.NewSpan;
import io.micronaut.tracing.annotation.SpanTag;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;

@Singleton
public class EventService {

private static final Logger LOG = LoggerFactory.getLogger(EventService.class);

private final EventProducer eventProducer;

public EventService(EventProducer eventProducer) {
this.eventProducer = eventProducer;
}

@NewSpan("receive events")
public EventsReceived postEvents(@SpanTag String source,
String track,
Event... events) {
final int numEvents = events.length;
try {
LOG.debug("Posting Events (source: {}, track {}, length {})", source, track, numEvents);
if (numEvents == 0) {
return new EventsReceived(false, 0);
}

final EventRecord[] eventRecords = Arrays.stream(events)
.map(ev -> new EventRecord(source, track, ev))
.toArray(EventRecord[]::new);

eventProducer.send(eventRecords);
return new EventsReceived(true, events.length);
} catch (Exception e) {
LOG.error("Unable to process events: {}", e.getMessage(), e);
return new EventsReceived(false, events.length);
}
}
}
package events.service;

import events.model.Event;
import events.model.EventRecord;
import events.model.EventsReceived;
import io.micronaut.tracing.annotation.NewSpan;
import io.micronaut.tracing.annotation.SpanTag;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;

@Singleton
public class EventService {

private static final Logger LOG = LoggerFactory.getLogger(EventService.class);

private final EventProducer eventProducer;

public EventService(EventProducer eventProducer) {
this.eventProducer = eventProducer;
}

@NewSpan("receive events")
public EventsReceived postEvents(@SpanTag String source,
String track,
Event... events) {
final int numEvents = events.length;
try {
LOG.debug("Posting Events (source: {}, track {}, length {})", source, track, numEvents);
if (numEvents == 0) {
return new EventsReceived(false, 0);
}

final EventRecord[] eventRecords = Arrays.stream(events)
.map(ev -> new EventRecord(source, track, ev))
.toArray(EventRecord[]::new);

eventProducer.send(eventRecords);
return new EventsReceived(true, events.length);
} catch (Exception e) {
LOG.error("Unable to process events: {}", e.getMessage(), e);
return new EventsReceived(false, events.length);
}
}
}
7 changes: 0 additions & 7 deletions src/events/app/src/main/resources/application-app.yml

This file was deleted.

10 changes: 10 additions & 0 deletions src/events/buildSrc/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
plugins {
id 'groovy-gradle-plugin'
}
repositories {
mavenCentral()
gradlePluginPortal()
}
dependencies {
implementation("com.bmuschko:gradle-docker-plugin:9.4.0")
}
17 changes: 9 additions & 8 deletions src/events/tck/src/main/java/events/AbstractEventsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,28 +56,29 @@ void testPostEvents() {
final String source = "client";
final String track = "abcxyz";
final String type = "pageView";
System.out.println("Client posting event: " + type);
final EventsReceived eventsReceived = client.postEvents(
source,
track,
new Event(type, details)
);

assertNotNull(eventsReceived);
assertTrue(eventsReceived.isSuccess());
assertEquals(1, eventsReceived.getEvents());
assertTrue(eventsReceived.success());
assertEquals(1, eventsReceived.events());
assertEventReceived(source, track, type, details);
}

private void assertEventReceived(String source, String track, String type, Map<String, String> details) {
await().atMost(30, SECONDS).until(() -> !eventsListener.received.isEmpty());
await().atMost(60, SECONDS).until(() -> !eventsListener.received.isEmpty());

final EventRecord eventRecord = eventsListener.received.stream().findFirst().orElse(null);
assertNotNull(eventRecord);
assertEquals(source, eventRecord.getSource());
assertEquals(track, eventRecord.getTrack());
assertNotNull(eventRecord.getTime());
assertEquals(type, eventRecord.getType());
assertEquals(details, eventRecord.getDetail());
assertEquals(source, eventRecord.source());
assertEquals(track, eventRecord.track());
assertNotNull(eventRecord.time());
assertEquals(type, eventRecord.type());
assertEquals(details, eventRecord.detail());
}


Expand Down

0 comments on commit ef76982

Please sign in to comment.