Skip to content

Commit dae97b6

Browse files
committed
Adding unit tests
1 parent 0b81ad8 commit dae97b6

File tree

10 files changed

+669
-31
lines changed

10 files changed

+669
-31
lines changed

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -714,7 +714,7 @@ public void syncSegmentsUploadedToRemoteStoreWithActiveMergesSegmentRegistry(Dir
714714
}
715715

716716
private boolean isRemoteStoreFileName(String name) {
717-
// TODO@kheraadi: Do we have a better way to check this?
717+
// TODO: Do we have a better way to check this?
718718
return name.contains(SEGMENT_NAME_UUID_SEPARATOR);
719719
}
720720

server/src/main/java/org/opensearch/indices/replication/ActiveMergesSegmentRegistry.java

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.util.Map;
1616
import java.util.Set;
1717
import java.util.concurrent.ConcurrentHashMap;
18+
import java.util.concurrent.locks.ReentrantLock;
1819

1920
/**
2021
* Registry to track active merge segments and their metadata.
@@ -24,7 +25,8 @@
2425
* */
2526
public class ActiveMergesSegmentRegistry {
2627
private final Map<String, UploadedSegmentMetadata> segmentMetadataMap = new ConcurrentHashMap<>();
27-
private final Set<String> filenameRegistry = ConcurrentHashMap.newKeySet();
28+
final Set<String> filenameRegistry = ConcurrentHashMap.newKeySet(); // package-private for tests
29+
private final ReentrantLock lock = new ReentrantLock();
2830

2931
private static class HOLDER {
3032
private static final ActiveMergesSegmentRegistry INSTANCE = new ActiveMergesSegmentRegistry();
@@ -41,10 +43,15 @@ public static ActiveMergesSegmentRegistry getInstance() {
4143
* @param localSegmentFilename Segment filename in the local store
4244
*/
4345
public void register(@NonNull String localSegmentFilename) {
44-
if (contains(localSegmentFilename)){
45-
throw new IllegalArgumentException(localSegmentFilename + "is already registered. Cannot reregister.");
46+
lock.lock();
47+
try {
48+
if (contains(localSegmentFilename)){
49+
throw new IllegalArgumentException(localSegmentFilename + " is already registered. Cannot reregister.");
50+
}
51+
filenameRegistry.add(localSegmentFilename);
52+
} finally {
53+
lock.unlock();
4654
}
47-
filenameRegistry.add(localSegmentFilename);
4855
}
4956

5057
/**
@@ -53,34 +60,44 @@ public void register(@NonNull String localSegmentFilename) {
5360
* @param metadata {@link UploadedSegmentMetadata} for the segment file
5461
*/
5562
public void updateMetadata(@NonNull String localSegmentFilename, @NonNull UploadedSegmentMetadata metadata) {
56-
if (contains(localSegmentFilename) == false) {
57-
throw new IllegalArgumentException("Segment " + localSegmentFilename + " is not registered");
63+
lock.lock();
64+
try {
65+
if (contains(localSegmentFilename) == false) {
66+
throw new IllegalArgumentException("Segment " + localSegmentFilename + " is not registered");
67+
}
68+
segmentMetadataMap.put(localSegmentFilename, metadata);
69+
filenameRegistry.add(metadata.getUploadedFilename());
70+
} finally {
71+
lock.unlock();
5872
}
59-
segmentMetadataMap.put(localSegmentFilename, metadata);
6073
}
6174

6275
/**
6376
* Unregisters a segment file from the registry.
6477
* @param segmentFilename Segment filename in local store
6578
*/
6679
public void unregister(@NonNull String segmentFilename) {
67-
synchronized(filenameRegistry) {
68-
filenameRegistry.remove(getExistingRemoteSegmentFilename(segmentFilename));
80+
lock.lock();
81+
try {
82+
if (segmentMetadataMap.containsKey(segmentFilename)) {
83+
String remoteFilename = segmentMetadataMap.get(segmentFilename).getUploadedFilename();
84+
filenameRegistry.remove(remoteFilename);
85+
}
6986
filenameRegistry.remove(segmentFilename);
87+
segmentMetadataMap.remove(segmentFilename);
88+
} finally {
89+
lock.unlock();
7090
}
71-
segmentMetadataMap.remove(segmentFilename);
7291
}
7392

7493
public boolean contains(@NonNull String segmentFilename) {
7594
return filenameRegistry.contains(segmentFilename);
7695
}
7796

7897
public String getExistingRemoteSegmentFilename(@NonNull String localSegmentFilename) {
79-
if (segmentMetadataMap.containsKey(localSegmentFilename)== false) {
80-
// This should never happen
98+
if (segmentMetadataMap.containsKey(localSegmentFilename) == false) {
8199
throw new IllegalArgumentException("Metadata for segment " + localSegmentFilename + " is not available.");
82100
}
83-
84101
return segmentMetadataMap.get(localSegmentFilename).getUploadedFilename();
85102
}
86103

server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishMergedSegmentActionProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public MergedSegmentPublisher.PublishAction get() {
5353
if (FeatureFlags.isEnabled(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING) == false) {
5454
return null;
5555
}
56-
// TODO@kheraadi: FIX THIS
56+
// TODO: FIX THIS
5757
if (false) {// || clusterService.localNode().isRemoteStoreNode() == false) {
5858
return new PublishMergedSegmentAction(
5959
settings, transportService, clusterService, indicesService,

server/src/main/java/org/opensearch/indices/replication/checkpoint/RemoteStorePublishMergedSegmentAction.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ private void publishMergedSegmentsToRemoteStore(IndexShard indexShard, RemoteSto
115115

116116
final CountDownLatch latch = new CountDownLatch(segmentsToUpload.size());
117117

118-
// TODO@kheraadi: Upload in low priority
118+
// TODO: Upload in low priority
119119
remoteStoreUploaderService.uploadSegments(
120120
segmentsToUpload,
121121
segmentsSizeMap,
@@ -153,23 +153,21 @@ public void onSuccess(String file) {
153153
public void onFailure(String file) {
154154
segmentsToUpload.forEach(activeMergesSegmentRegistry::unregister);
155155
/**
156-
* TODO@kheraadi:
157-
* 1. reset ActiveMergesRegistry
158-
* 2. abort merge
156+
* TODO: abort merge
159157
*/
160158
}
161159
}
162160
);
163161
try {
164-
if(latch.await(60, TimeUnit.MINUTES) == false) {throw new RuntimeException("Merged segment upload timed out.");}; // TODO@kheraadi: Finalize timeout
162+
if(latch.await(60, TimeUnit.MINUTES) == false) {throw new RuntimeException("Merged segment upload timed out.");}; // TODO: Finalize timeout
165163
} catch (InterruptedException e) {
166164
throw new RuntimeException(e);
167-
// TODO@kheraadi: abort merge properly here
165+
// TODO: abort merge properly here
168166
}
169167
}
170168

171169
/**
172-
* TODO@kheraadi: REBASE ONCE UPLOAD CHANGES ARE COMPLETE
170+
* TODO: REBASE ONCE UPLOAD CHANGES ARE COMPLETE
173171
*/
174172
private RemoteStoreUploaderService getRemoteStoreUploaderService(IndexShard indexShard) {
175173
return new RemoteStoreUploaderService(
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
package org.opensearch.indices.replication;
2+
3+
import org.junit.After;
4+
import org.junit.Before;
5+
import org.junit.BeforeClass;
6+
import org.junit.Test;
7+
import org.junit.runner.OrderWith;
8+
import org.opensearch.index.store.RemoteSegmentStoreDirectory.UploadedSegmentMetadata;
9+
10+
import java.util.Map;
11+
import java.util.concurrent.CountDownLatch;
12+
import java.util.concurrent.ExecutorService;
13+
import java.util.concurrent.Executors;
14+
import java.util.concurrent.TimeUnit;
15+
16+
import static org.junit.Assert.*;
17+
import static org.mockito.Mockito.*;
18+
19+
public class ActiveMergesSegmentRegistryTests {
20+
21+
private ActiveMergesSegmentRegistry registry;
22+
private UploadedSegmentMetadata mockMetadata;
23+
24+
@Before
25+
public void setUp() {
26+
registry = ActiveMergesSegmentRegistry.getInstance();
27+
// Clear registry state before each test
28+
clearRegistry();
29+
30+
mockMetadata = mock(UploadedSegmentMetadata.class);
31+
when(mockMetadata.getUploadedFilename()).thenReturn("remote_segment_1.si");
32+
}
33+
34+
private void clearRegistry() {
35+
// Clear all registered segments
36+
Map<String, UploadedSegmentMetadata> metadataMap = registry.segmentMetadataMap();
37+
metadataMap.keySet().forEach(registry::unregister);
38+
registry.filenameRegistry.clear();
39+
}
40+
41+
@Test
42+
public void testSingletonInstance() {
43+
ActiveMergesSegmentRegistry instance1 = ActiveMergesSegmentRegistry.getInstance();
44+
ActiveMergesSegmentRegistry instance2 = ActiveMergesSegmentRegistry.getInstance();
45+
assertSame(instance1, instance2);
46+
}
47+
48+
@Test
49+
public void testRegisterSegment() {
50+
String filename = "segment_1.si";
51+
registry.register(filename);
52+
assertTrue(registry.contains(filename));
53+
}
54+
55+
@Test(expected = IllegalArgumentException.class)
56+
public void testRegisterDuplicateSegment() {
57+
String filename = "segment_1.si";
58+
registry.register(filename);
59+
registry.register(filename); // Should throw exception
60+
}
61+
62+
@Test
63+
public void testUpdateMetadata() {
64+
String filename = "segment_1.si";
65+
registry.register(filename);
66+
registry.updateMetadata(filename, mockMetadata);
67+
68+
assertEquals(mockMetadata, registry.getMetadata(filename));
69+
assertTrue(registry.contains("remote_segment_1.si"));
70+
}
71+
72+
@Test(expected = IllegalArgumentException.class)
73+
public void testUpdateMetadataUnregisteredSegment() {
74+
registry.updateMetadata("unregistered_segment.si", mockMetadata);
75+
}
76+
77+
@Test
78+
public void testUnregisterSegment() {
79+
String filename = "segment_1.si";
80+
registry.register(filename);
81+
registry.updateMetadata(filename, mockMetadata);
82+
83+
registry.unregister(filename);
84+
85+
assertFalse(registry.contains(filename));
86+
assertFalse(registry.contains("remote_segment_1.si"));
87+
assertNull(registry.getMetadata(filename));
88+
}
89+
90+
@Test
91+
public void testUnregisterNonExistentSegment() {
92+
// Should not throw exception
93+
registry.unregister("non_existent.si");
94+
}
95+
96+
@Test
97+
public void testGetExistingRemoteSegmentFilename() {
98+
String filename = "segment_1.si";
99+
registry.register(filename);
100+
registry.updateMetadata(filename, mockMetadata);
101+
102+
assertEquals("remote_segment_1.si", registry.getExistingRemoteSegmentFilename(filename));
103+
}
104+
105+
@Test(expected = IllegalArgumentException.class)
106+
public void testGetExistingRemoteSegmentFilenameNoMetadata() {
107+
String filename = "segment_1.si";
108+
registry.register(filename);
109+
registry.getExistingRemoteSegmentFilename(filename); // Metadata not available
110+
}
111+
112+
@Test
113+
public void testCanDelete() {
114+
String filename = "segment_1.si";
115+
assertTrue(registry.canDelete(filename)); // Not registered
116+
117+
registry.register(filename);
118+
assertFalse(registry.canDelete(filename)); // Registered
119+
120+
registry.unregister(filename);
121+
assertTrue(registry.canDelete(filename)); // Unregistered
122+
}
123+
124+
@Test
125+
public void testConcurrentAccess() throws InterruptedException {
126+
int threadCount = 10;
127+
int operationsPerThread = 100;
128+
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
129+
CountDownLatch latch = new CountDownLatch(threadCount);
130+
131+
for (int i = 0; i < threadCount; i++) {
132+
final int threadId = i;
133+
executor.submit(() -> {
134+
try {
135+
for (int j = 0; j < operationsPerThread; j++) {
136+
String filename = "segment_" + threadId + "_" + j + ".si";
137+
String remoteFilename = "remote_" + filename;
138+
UploadedSegmentMetadata metadata = mock(UploadedSegmentMetadata.class);
139+
when(metadata.getUploadedFilename()).thenReturn(remoteFilename);
140+
registry.register(filename);
141+
assertTrue(registry.contains(filename));
142+
registry.updateMetadata(filename, metadata);
143+
assertEquals(registry.getExistingRemoteSegmentFilename(filename), remoteFilename);
144+
registry.unregister(filename);
145+
assertFalse(registry.contains(filename));
146+
}
147+
} finally {
148+
latch.countDown();
149+
}
150+
});
151+
}
152+
153+
assertTrue(latch.await(30, TimeUnit.SECONDS));
154+
executor.shutdown();
155+
}
156+
157+
@Test
158+
public void testMultipleSegmentsLifecycle() {
159+
String[] filenames = {"seg1.si", "seg2.si", "seg3.si"};
160+
UploadedSegmentMetadata[] metadatas = new UploadedSegmentMetadata[3];
161+
162+
// Setup mocks
163+
for (int i = 0; i < 3; i++) {
164+
metadatas[i] = mock(UploadedSegmentMetadata.class);
165+
when(metadatas[i].getUploadedFilename()).thenReturn("remote_" + filenames[i]);
166+
}
167+
168+
// Register all
169+
for (String filename : filenames) {
170+
registry.register(filename);
171+
assertTrue(registry.contains(filename));
172+
}
173+
174+
// Update metadata
175+
for (int i = 0; i < 3; i++) {
176+
registry.updateMetadata(filenames[i], metadatas[i]);
177+
assertEquals(metadatas[i], registry.getMetadata(filenames[i]));
178+
}
179+
180+
// Verify all are tracked
181+
assertEquals(3, registry.segmentMetadataMap().size());
182+
183+
// Unregister one
184+
registry.unregister(filenames[1]);
185+
assertFalse(registry.contains(filenames[1]));
186+
assertEquals(2, registry.segmentMetadataMap().size());
187+
188+
// Others still exist
189+
assertTrue(registry.contains(filenames[0]));
190+
assertTrue(registry.contains(filenames[2]));
191+
}
192+
}

0 commit comments

Comments
 (0)