Skip to content

Commit 3d950b0

Browse files
mcruzdevsalaboy
andauthored
Add statestore example with Outbox pattern (#1582)
* Add statestore example with Outbox pattern Signed-off-by: Matheus Cruz <[email protected]> * Clean events after each test Signed-off-by: Matheus Cruz <[email protected]> * Add license header Signed-off-by: Matheus Cruz <[email protected]> * Apply pull request suggestions Signed-off-by: Matheus Cruz <[email protected]> --------- Signed-off-by: Matheus Cruz <[email protected]> Co-authored-by: salaboy <[email protected]>
1 parent e8f5deb commit 3d950b0

File tree

9 files changed

+360
-7
lines changed

9 files changed

+360
-7
lines changed
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Copyright 2025 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.it.testcontainers.pubsub.outbox;
15+
16+
import io.dapr.client.DaprClient;
17+
import io.dapr.client.domain.ExecuteStateTransactionRequest;
18+
import io.dapr.client.domain.State;
19+
import io.dapr.client.domain.TransactionalStateOperation;
20+
import io.dapr.it.testcontainers.DaprClientFactory;
21+
import io.dapr.testcontainers.Component;
22+
import io.dapr.testcontainers.DaprContainer;
23+
import io.dapr.testcontainers.DaprLogLevel;
24+
import org.assertj.core.api.Assertions;
25+
import org.awaitility.Awaitility;
26+
import org.junit.jupiter.api.BeforeEach;
27+
import org.junit.jupiter.api.Tag;
28+
import org.junit.jupiter.api.Test;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
import org.springframework.boot.test.context.SpringBootTest;
32+
import org.springframework.test.context.DynamicPropertyRegistry;
33+
import org.springframework.test.context.DynamicPropertySource;
34+
import org.testcontainers.containers.Network;
35+
import org.testcontainers.containers.wait.strategy.Wait;
36+
import org.testcontainers.junit.jupiter.Container;
37+
import org.testcontainers.junit.jupiter.Testcontainers;
38+
39+
import java.time.Duration;
40+
import java.util.Collections;
41+
import java.util.List;
42+
import java.util.Map;
43+
import java.util.Random;
44+
45+
import static io.dapr.it.testcontainers.ContainerConstants.DAPR_RUNTIME_IMAGE_TAG;
46+
47+
@SpringBootTest(
48+
webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT,
49+
classes = {
50+
TestPubsubOutboxApplication.class
51+
}
52+
)
53+
@Testcontainers
54+
@Tag("testcontainers")
55+
public class DaprPubSubOutboxIT {
56+
57+
private static final Logger LOG = LoggerFactory.getLogger(DaprPubSubOutboxIT.class);
58+
private static final Network DAPR_NETWORK = Network.newNetwork();
59+
private static final Random RANDOM = new Random();
60+
private static final int PORT = RANDOM.nextInt(1000) + 8000;
61+
private static final String APP_FOUND_MESSAGE_PATTERN = ".*application discovered on port.*";
62+
63+
private static final String PUBSUB_APP_ID = "pubsub-dapr-app";
64+
private static final String PUBSUB_NAME = "pubsub";
65+
66+
// topics
67+
private static final String TOPIC_PRODUCT_CREATED = "product.created";
68+
private static final String STATE_STORE_NAME = "kvstore";
69+
70+
@Container
71+
private static final DaprContainer DAPR_CONTAINER = new DaprContainer(DAPR_RUNTIME_IMAGE_TAG)
72+
.withAppName(PUBSUB_APP_ID)
73+
.withNetwork(DAPR_NETWORK)
74+
.withComponent(new Component(STATE_STORE_NAME, "state.in-memory", "v1", Map.of(
75+
"outboxPublishPubsub", PUBSUB_NAME,
76+
"outboxPublishTopic", TOPIC_PRODUCT_CREATED
77+
)))
78+
.withComponent(new Component(PUBSUB_NAME, "pubsub.in-memory", "v1", Collections.emptyMap()))
79+
.withDaprLogLevel(DaprLogLevel.DEBUG)
80+
.withLogConsumer(outputFrame -> LOG.info(outputFrame.getUtf8String()))
81+
.withAppChannelAddress("host.testcontainers.internal")
82+
.withAppPort(PORT);
83+
84+
/**
85+
* Expose the Dapr ports to the host.
86+
*
87+
* @param registry the dynamic property registry
88+
*/
89+
@DynamicPropertySource
90+
static void daprProperties(DynamicPropertyRegistry registry) {
91+
registry.add("dapr.http.endpoint", DAPR_CONTAINER::getHttpEndpoint);
92+
registry.add("dapr.grpc.endpoint", DAPR_CONTAINER::getGrpcEndpoint);
93+
registry.add("server.port", () -> PORT);
94+
}
95+
96+
97+
@BeforeEach
98+
public void setUp() {
99+
org.testcontainers.Testcontainers.exposeHostPorts(PORT);
100+
}
101+
102+
103+
@Test
104+
public void shouldPublishUsingOutbox() throws Exception {
105+
Wait.forLogMessage(APP_FOUND_MESSAGE_PATTERN, 1).waitUntilReady(DAPR_CONTAINER);
106+
107+
try (DaprClient client = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER).build()) {
108+
109+
ExecuteStateTransactionRequest transactionRequest = new ExecuteStateTransactionRequest(STATE_STORE_NAME);
110+
111+
Product pencil = new Product("Pencil", 1.50);
112+
State<Product> state = new State<>(
113+
pencil.getId(), pencil, null
114+
);
115+
116+
TransactionalStateOperation<Product> operation = new TransactionalStateOperation<>(
117+
TransactionalStateOperation.OperationType.UPSERT, state
118+
);
119+
120+
transactionRequest.setOperations(List.of(operation));
121+
122+
client.executeStateTransaction(transactionRequest).block();
123+
124+
Awaitility.await().atMost(Duration.ofSeconds(10))
125+
.ignoreExceptions()
126+
.untilAsserted(() -> Assertions.assertThat(ProductWebhookController.EVENT_LIST).isNotEmpty());
127+
}
128+
}
129+
130+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2025 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
package io.dapr.it.testcontainers.pubsub.outbox;
14+
15+
import java.util.UUID;
16+
17+
public class Product {
18+
private String id;
19+
private String name;
20+
private double price;
21+
22+
public Product() {
23+
}
24+
25+
public Product(String name, double price) {
26+
this.id = UUID.randomUUID().toString();
27+
this.name = name;
28+
this.price = price;
29+
}
30+
31+
public String getId() {
32+
return id;
33+
}
34+
35+
public void setId(String id) {
36+
this.id = id;
37+
}
38+
39+
public String getName() {
40+
return name;
41+
}
42+
43+
public void setName(String name) {
44+
this.name = name;
45+
}
46+
47+
public double getPrice() {
48+
return price;
49+
}
50+
51+
public void setPrice(double price) {
52+
this.price = price;
53+
}
54+
55+
@Override
56+
public String toString() {
57+
return "Product{" +
58+
"id='" + id + '\'' +
59+
", name='" + name + '\'' +
60+
", price=" + price +
61+
'}';
62+
}
63+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2025 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
package io.dapr.it.testcontainers.pubsub.outbox;
14+
15+
import io.dapr.Topic;
16+
import io.dapr.client.domain.CloudEvent;
17+
import org.springframework.web.bind.annotation.PostMapping;
18+
import org.springframework.web.bind.annotation.RequestBody;
19+
import org.springframework.web.bind.annotation.RequestMapping;
20+
import org.springframework.web.bind.annotation.RestController;
21+
22+
import java.util.List;
23+
import java.util.concurrent.CopyOnWriteArrayList;
24+
25+
@RestController
26+
@RequestMapping("/webhooks/products")
27+
public class ProductWebhookController {
28+
29+
public static final List<CloudEvent<Product>> EVENT_LIST = new CopyOnWriteArrayList<>();
30+
31+
@PostMapping("/created")
32+
@Topic(name = "product.created", pubsubName = "pubsub")
33+
public void handleEvent(@RequestBody CloudEvent cloudEvent) {
34+
System.out.println("Received product.created event: " + cloudEvent.getData());
35+
EVENT_LIST.add(cloudEvent);
36+
}
37+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2025 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
package io.dapr.it.testcontainers.pubsub.outbox;
14+
15+
import org.springframework.boot.SpringApplication;
16+
import org.springframework.boot.autoconfigure.SpringBootApplication;
17+
18+
@SpringBootApplication
19+
public class TestPubsubOutboxApplication {
20+
public static void main(String[] args) {
21+
SpringApplication.run(TestPubsubOutboxApplication.class, args);
22+
}
23+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package io.dapr.springboot.examples.producer;
2+
3+
public class OrderDTO {
4+
5+
private String id;
6+
private String item;
7+
private Integer amount;
8+
9+
public OrderDTO() {
10+
}
11+
12+
public OrderDTO(String id, String item, Integer amount) {
13+
this.id = id;
14+
this.item = item;
15+
this.amount = amount;
16+
}
17+
18+
public String getId() {
19+
return id;
20+
}
21+
22+
23+
public String getItem() {
24+
return item;
25+
}
26+
27+
public Integer getAmount() {
28+
return amount;
29+
}
30+
31+
}

spring-boot-examples/producer-app/src/main/java/io/dapr/springboot/examples/producer/OrdersRestController.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313

1414
package io.dapr.springboot.examples.producer;
1515

16+
import io.dapr.client.DaprClient;
17+
import io.dapr.client.domain.ExecuteStateTransactionRequest;
18+
import io.dapr.client.domain.State;
19+
import io.dapr.client.domain.TransactionalStateOperation;
1620
import io.dapr.spring.data.repository.config.EnableDaprRepositories;
1721
import io.dapr.spring.messaging.DaprMessagingTemplate;
1822
import org.slf4j.Logger;
@@ -24,18 +28,23 @@
2428
import org.springframework.web.bind.annotation.RequestParam;
2529
import org.springframework.web.bind.annotation.RestController;
2630

31+
import java.util.List;
32+
2733
@RestController
2834
@EnableDaprRepositories
2935
public class OrdersRestController {
3036

31-
private final Logger logger = LoggerFactory.getLogger(OrdersRestController.class);
37+
private static final Logger logger = LoggerFactory.getLogger(OrdersRestController.class);
3238

3339
@Autowired
3440
private OrderRepository repository;
3541

3642
@Autowired
3743
private DaprMessagingTemplate<Order> messagingTemplate;
3844

45+
@Autowired
46+
private DaprClient daprClient;
47+
3948
/**
4049
* Store orders from customers.
4150
* @param order from the customer
@@ -51,6 +60,28 @@ public String storeOrder(@RequestBody Order order) {
5160
return "Order Stored and Event Published";
5261
}
5362

63+
@PostMapping("/orders/outbox")
64+
public String storeOrderOutbox(@RequestBody Order order) {
65+
logger.info("Storing Order with Outbox: {}", order);
66+
ExecuteStateTransactionRequest transactionRequest = new ExecuteStateTransactionRequest("kvstore-outbox");
67+
68+
State<Order> state = new State<>(
69+
order.getId(), order, null
70+
);
71+
72+
TransactionalStateOperation<Order> operation = new TransactionalStateOperation<>(
73+
TransactionalStateOperation.OperationType.UPSERT, state
74+
);
75+
76+
transactionRequest.setOperations(List.of(operation));
77+
78+
daprClient.executeStateTransaction(transactionRequest).block();
79+
80+
logger.info("Order Stored with Outbox: {}", order);
81+
82+
return "Order Stored with Outbox";
83+
}
84+
5485
@GetMapping("/orders")
5586
public Iterable<Order> getAll() {
5687
return repository.findAll();

spring-boot-examples/producer-app/src/test/java/io/dapr/springboot/examples/producer/DaprTestContainersConfig.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public class DaprTestContainersConfig {
4141
static final String CONNECTION_STRING =
4242
"host=postgres user=postgres password=password port=5432 connect_timeout=10 database=dapr_db_repository";
4343
static final Map<String, String> STATE_STORE_PROPERTIES = createStateStoreProperties();
44-
44+
static final Map<String, String> STATE_STORE_OUTBOX_PROPERTIES = createStateStoreOutboxProperties();
4545
static final Map<String, String> BINDING_PROPERTIES = Collections.singletonMap("connectionString", CONNECTION_STRING);
4646

4747

@@ -118,9 +118,8 @@ public DaprContainer daprContainer(Network daprNetwork, PostgreSQLContainer<?> p
118118
.withComponent(new Component("kvstore", "state.postgresql", "v1", STATE_STORE_PROPERTIES))
119119
.withComponent(new Component("kvbinding", "bindings.postgresql", "v1", BINDING_PROPERTIES))
120120
.withComponent(new Component("pubsub", "pubsub.rabbitmq", "v1", rabbitMqProperties))
121+
.withComponent(new Component("kvstore-outbox", "state.postgresql", "v1", STATE_STORE_OUTBOX_PROPERTIES))
121122
.withSubscription(new Subscription("app", "pubsub", "topic", "/subscribe"))
122-
// .withDaprLogLevel(DaprLogLevel.DEBUG)
123-
// .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
124123
.withAppPort(8080)
125124
.withAppHealthCheckPath("/actuator/health")
126125
.withAppChannelAddress("host.testcontainers.internal")
@@ -139,5 +138,14 @@ private static Map<String, String> createStateStoreProperties() {
139138
return result;
140139
}
141140

141+
private static Map<String, String> createStateStoreOutboxProperties() {
142+
Map<String, String> result = new HashMap<>();
143+
result.put("connectionString", CONNECTION_STRING);
144+
result.put("outboxPublishPubsub", "pubsub");
145+
result.put("outboxPublishTopic", "outbox-topic");
146+
147+
return result;
148+
}
149+
142150

143151
}

0 commit comments

Comments
 (0)