Skip to content

Commit d7ffb0b

Browse files
authored
Merge pull request #14 from anfeldma-ms/master
Removed latch construct
2 parents 8593404 + 9e1f28d commit d7ffb0b

File tree

3 files changed

+141
-139
lines changed

3 files changed

+141
-139
lines changed

pom.xml

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,28 @@
5858
<dependency>
5959
<groupId>com.azure</groupId>
6060
<artifactId>azure-cosmos</artifactId>
61-
<version>4.0.1</version>
61+
<version>4.3.1</version>
6262
</dependency>
63+
64+
<dependency>
65+
<groupId>org.apache.logging.log4j</groupId>
66+
<artifactId>log4j-slf4j-impl</artifactId>
67+
<version>2.13.0</version>
68+
<scope>test</scope>
69+
</dependency>
70+
71+
<dependency>
72+
<groupId>org.apache.logging.log4j</groupId>
73+
<artifactId>log4j-api</artifactId>
74+
<version>2.11.1</version>
75+
<scope>test</scope>
76+
</dependency>
77+
<dependency>
78+
<groupId>org.slf4j</groupId>
79+
<artifactId>slf4j-jdk14</artifactId>
80+
<version>1.7.28</version>
81+
</dependency>
82+
6383
<dependency>
6484
<groupId>org.apache.commons</groupId>
6585
<artifactId>commons-lang3</artifactId>

src/main/java/com/azure/cosmos/sample/async/AsyncMain.java

Lines changed: 93 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@
2020
import com.azure.cosmos.sample.common.Families;
2121
import com.azure.cosmos.sample.common.Family;
2222
import com.azure.cosmos.util.CosmosPagedFlux;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
2325
import reactor.core.publisher.Flux;
2426
import reactor.core.publisher.Mono;
2527

2628
import java.time.Duration;
2729
import java.util.Collections;
28-
import java.util.concurrent.CountDownLatch;
2930
import java.util.stream.Collectors;
3031

3132
public class AsyncMain {
@@ -38,6 +39,8 @@ public class AsyncMain {
3839
private CosmosAsyncDatabase database;
3940
private CosmosAsyncContainer container;
4041

42+
protected static Logger logger = LoggerFactory.getLogger(AsyncMain.class.getSimpleName());
43+
4144
public void close() {
4245
client.close();
4346
}
@@ -52,14 +55,14 @@ public static void main(String[] args) {
5255
AsyncMain p = new AsyncMain();
5356

5457
try {
55-
System.out.println("Starting ASYNC main");
58+
logger.info("Starting ASYNC main");
5659
p.getStartedDemo();
57-
System.out.println("Demo complete, please hold while resources are released");
60+
logger.info("Demo complete, please hold while resources are released");
5861
} catch (Exception e) {
5962
e.printStackTrace();
60-
System.err.println(String.format("Cosmos getStarted failed with %s", e));
63+
logger.error("Cosmos getStarted failed with %s", e);
6164
} finally {
62-
System.out.println("Closing the client");
65+
logger.info("Closing the client");
6366
p.close();
6467
}
6568
System.exit(0);
@@ -68,7 +71,7 @@ public static void main(String[] args) {
6871
// </Main>
6972

7073
private void getStartedDemo() throws Exception {
71-
System.out.println("Using Azure Cosmos DB endpoint: " + AccountSettings.HOST);
74+
logger.info("Using Azure Cosmos DB endpoint: {}", AccountSettings.HOST);
7275

7376
// Create async client
7477
// <CreateAsyncClient>
@@ -106,29 +109,29 @@ private void getStartedDemo() throws Exception {
106109
johnsonFamilyItem,
107110
smithFamilyItem);
108111

109-
System.out.println("Reading items.");
112+
logger.info("Reading items.");
110113
readItems(familiesToCreate);
111114

112-
System.out.println("Querying items.");
115+
logger.info("Querying items.");
113116
queryItems();
114117
}
115118

116119
private void createDatabaseIfNotExists() throws Exception {
117-
System.out.println("Create database " + databaseName + " if not exists.");
120+
logger.info("Create database {} if not exists.", databaseName);
118121

119122
// Create database if not exists
120123
// <CreateDatabaseIfNotExists>
121124
Mono<CosmosDatabaseResponse> databaseResponseMono = client.createDatabaseIfNotExists(databaseName);
122125
databaseResponseMono.flatMap(databaseResponse -> {
123126
database = client.getDatabase(databaseResponse.getProperties().getId());
124-
System.out.println("Checking database " + database.getId() + " completed!\n");
127+
logger.info("Checking database {} completed!\n", database.getId());
125128
return Mono.empty();
126129
}).block();
127130
// </CreateDatabaseIfNotExists>
128131
}
129132

130133
private void createContainerIfNotExists() throws Exception {
131-
System.out.println("Create container " + containerName + " if not exists.");
134+
logger.info("Create container {} if not exists.", containerName);
132135

133136
// Create container if not exists
134137
// <CreateContainerIfNotExists>
@@ -139,7 +142,7 @@ private void createContainerIfNotExists() throws Exception {
139142
// Create container with 400 RU/s
140143
containerResponseMono.flatMap(containerResponse -> {
141144
container = database.getContainer(containerResponse.getProperties().getId());
142-
System.out.println("Checking container " + container.getId() + " completed!\n");
145+
logger.info("Checking container {} completed!\n", container.getId());
143146
return Mono.empty();
144147
}).block();
145148

@@ -150,46 +153,36 @@ private void createFamilies(Flux<Family> families) throws Exception {
150153

151154
// <CreateItem>
152155

153-
final CountDownLatch completionLatch = new CountDownLatch(1);
156+
try {
154157

155-
// Combine multiple item inserts, associated success println's, and a final aggregate stats println into one Reactive stream.
156-
families.flatMap(family -> {
158+
// Combine multiple item inserts, associated success println's, and a final aggregate stats println into one Reactive stream.
159+
double charge = families.flatMap(family -> {
157160
return container.createItem(family);
158161
}) //Flux of item request responses
159-
.flatMap(itemResponse -> {
160-
System.out.println(String.format("Created item with request charge of %.2f within" +
161-
" duration %s",
162-
itemResponse.getRequestCharge(), itemResponse.getDuration()));
163-
System.out.println(String.format("Item ID: %s\n", itemResponse.getItem().getId()));
164-
return Mono.just(itemResponse.getRequestCharge());
165-
}) //Flux of request charges
166-
.reduce(0.0,
167-
(charge_n,charge_nplus1) -> charge_n + charge_nplus1
168-
) //Mono of total charge - there will be only one item in this stream
169-
.subscribe(charge -> {
170-
System.out.println(String.format("Created items with total request charge of %.2f\n",
171-
charge));
172-
},
173-
err -> {
174-
if (err instanceof CosmosException) {
175-
//Client-specific errors
176-
CosmosException cerr = (CosmosException)err;
177-
cerr.printStackTrace();
178-
System.err.println(String.format("Read Item failed with %s\n", cerr));
179-
} else {
180-
//General errors
181-
err.printStackTrace();
182-
}
183-
184-
completionLatch.countDown();
185-
},
186-
() -> {completionLatch.countDown();}
187-
); //Preserve the total charge and print aggregate charge/item count stats.
188-
189-
try {
190-
completionLatch.await();
191-
} catch (InterruptedException err) {
192-
throw new AssertionError("Unexpected Interruption",err);
162+
.flatMap(itemResponse -> {
163+
logger.info("Created item with request charge of {}} within" +
164+
" duration {}",
165+
itemResponse.getRequestCharge(), itemResponse.getDuration());
166+
logger.info("Item ID: {}\n", itemResponse.getItem().getId());
167+
return Mono.just(itemResponse.getRequestCharge());
168+
}) //Flux of request charges
169+
.reduce(0.0,
170+
(charge_n, charge_nplus1) -> charge_n + charge_nplus1
171+
) //Mono of total charge - there will be only one item in this stream
172+
.block(); //Preserve the total charge and print aggregate charge/item count stats.
173+
174+
logger.info("Created items with total request charge of {}\n", charge);
175+
176+
} catch (Exception err) {
177+
if (err instanceof CosmosException) {
178+
//Client-specific errors
179+
CosmosException cerr = (CosmosException) err;
180+
cerr.printStackTrace();
181+
logger.error("Read Item failed with %s\n", cerr);
182+
} else {
183+
//General errors
184+
err.printStackTrace();
185+
}
193186
}
194187

195188
// </CreateItem>
@@ -200,39 +193,29 @@ private void readItems(Flux<Family> familiesToCreate) {
200193
// This will help fast look up of items because of partition key
201194
// <ReadItem>
202195

203-
final CountDownLatch completionLatch = new CountDownLatch(1);
204-
205-
familiesToCreate.flatMap(family -> {
206-
Mono<CosmosItemResponse<Family>> asyncItemResponseMono = container.readItem(family.getId(), new PartitionKey(family.getLastName()), Family.class);
207-
return asyncItemResponseMono;
208-
})
209-
.subscribe(
210-
itemResponse -> {
211-
double requestCharge = itemResponse.getRequestCharge();
212-
Duration requestLatency = itemResponse.getDuration();
213-
System.out.println(String.format("Item successfully read with id %s with a charge of %.2f and within duration %s",
214-
itemResponse.getItem().getId(), requestCharge, requestLatency));
215-
},
216-
err -> {
217-
if (err instanceof CosmosException) {
218-
//Client-specific errors
219-
CosmosException cerr = (CosmosException)err;
220-
cerr.printStackTrace();
221-
System.err.println(String.format("Read Item failed with %s\n", cerr));
222-
} else {
223-
//General errors
224-
err.printStackTrace();
225-
}
226-
227-
completionLatch.countDown();
228-
},
229-
() -> {completionLatch.countDown();}
230-
);
231-
232196
try {
233-
completionLatch.await();
234-
} catch (InterruptedException err) {
235-
throw new AssertionError("Unexpected Interruption",err);
197+
198+
familiesToCreate.flatMap(family -> {
199+
Mono<CosmosItemResponse<Family>> asyncItemResponseMono = container.readItem(family.getId(), new PartitionKey(family.getLastName()), Family.class);
200+
return asyncItemResponseMono;
201+
}).flatMap(itemResponse -> {
202+
double requestCharge = itemResponse.getRequestCharge();
203+
Duration requestLatency = itemResponse.getDuration();
204+
logger.info("Item successfully read with id {} with a charge of {} and within duration {}",
205+
itemResponse.getItem().getId(), requestCharge, requestLatency);
206+
return Flux.empty();
207+
}).blockLast();
208+
209+
} catch (Exception err) {
210+
if (err instanceof CosmosException) {
211+
//Client-specific errors
212+
CosmosException cerr = (CosmosException) err;
213+
cerr.printStackTrace();
214+
logger.error("Read Item failed with {}\n", cerr);
215+
} else {
216+
//General errors
217+
err.printStackTrace();
218+
}
236219
}
237220

238221
// </ReadItem>
@@ -242,47 +225,41 @@ private void queryItems() {
242225
// <QueryItems>
243226
// Set some common query options
244227

228+
int preferredPageSize = 10; // We'll use this later
229+
245230
CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();
246-
// Set query metrics enabled to get metrics around query executions
231+
232+
// Set populate query metrics to get metrics around query executions
247233
queryOptions.setQueryMetricsEnabled(true);
248234

249235
CosmosPagedFlux<Family> pagedFluxResponse = container.queryItems(
250-
"SELECT * FROM Family WHERE Family.lastName IN ('Andersen', 'Wakefield', 'Johnson')", queryOptions, Family.class);
251-
252-
final CountDownLatch completionLatch = new CountDownLatch(1);
253-
254-
pagedFluxResponse.byPage(10).subscribe(
255-
fluxResponse -> {
256-
System.out.println("Got a page of query result with " +
257-
fluxResponse.getResults().size() + " items(s)"
258-
+ " and request charge of " + fluxResponse.getRequestCharge());
259-
260-
System.out.println("Item Ids " + fluxResponse
261-
.getResults()
262-
.stream()
263-
.map(Family::getId)
264-
.collect(Collectors.toList()));
265-
},
266-
err -> {
267-
if (err instanceof CosmosException) {
268-
//Client-specific errors
269-
CosmosException cerr = (CosmosException)err;
270-
cerr.printStackTrace();
271-
System.err.println(String.format("Read Item failed with %s\n", cerr));
272-
} else {
273-
//General errors
274-
err.printStackTrace();
275-
}
276-
277-
completionLatch.countDown();
278-
},
279-
() -> {completionLatch.countDown();}
280-
);
236+
"SELECT * FROM Family WHERE Family.lastName IN ('Andersen', 'Wakefield', 'Johnson')", queryOptions, Family.class);
281237

282238
try {
283-
completionLatch.await();
284-
} catch (InterruptedException err) {
285-
throw new AssertionError("Unexpected Interruption",err);
239+
240+
pagedFluxResponse.byPage(preferredPageSize).flatMap(fluxResponse -> {
241+
logger.info("Got a page of query result with {} items(s)"
242+
+ " and request charge of {}", fluxResponse.getResults().size(), fluxResponse.getRequestCharge());
243+
244+
logger.info("Item Ids {}" , fluxResponse
245+
.getResults()
246+
.stream()
247+
.map(Family::getId)
248+
.collect(Collectors.toList()));
249+
250+
return Flux.empty();
251+
}).blockLast();
252+
253+
} catch(Exception err) {
254+
if (err instanceof CosmosException) {
255+
//Client-specific errors
256+
CosmosException cerr = (CosmosException) err;
257+
cerr.printStackTrace();
258+
logger.error("Read Item failed with %s\n", cerr);
259+
} else {
260+
//General errors
261+
err.printStackTrace();
262+
}
286263
}
287264

288265
// </QueryItems>

0 commit comments

Comments
 (0)