88
99package org .opensearch .search .pipeline .common ;
1010
11- import org .opensearch .action .admin .indices .delete .DeleteIndexRequest ;
1211import org .opensearch .action .admin .indices .refresh .RefreshRequest ;
1312import org .opensearch .action .admin .indices .refresh .RefreshResponse ;
13+ import org .opensearch .action .admin .indices .settings .put .UpdateSettingsRequest ;
1414import org .opensearch .action .index .IndexRequest ;
1515import org .opensearch .action .index .IndexResponse ;
1616import org .opensearch .action .search .DeleteSearchPipelineRequest ;
17+ import org .opensearch .action .search .GetSearchPipelineRequest ;
18+ import org .opensearch .action .search .GetSearchPipelineResponse ;
1719import org .opensearch .action .search .PutSearchPipelineRequest ;
1820import org .opensearch .action .search .SearchRequest ;
1921import org .opensearch .action .search .SearchResponse ;
2022import org .opensearch .action .support .master .AcknowledgedResponse ;
23+ import org .opensearch .common .settings .Settings ;
2124import org .opensearch .core .common .bytes .BytesArray ;
25+ import org .opensearch .core .common .bytes .BytesReference ;
2226import org .opensearch .core .rest .RestStatus ;
2327import org .opensearch .core .xcontent .MediaTypeRegistry ;
2428import org .opensearch .index .query .MatchAllQueryBuilder ;
29+ import org .opensearch .ingest .PipelineConfiguration ;
2530import org .opensearch .plugins .Plugin ;
2631import org .opensearch .search .builder .SearchSourceBuilder ;
2732import org .opensearch .test .OpenSearchIntegTestCase ;
33+ import org .junit .After ;
34+ import org .junit .Before ;
2835
2936import java .util .Collection ;
37+ import java .util .HashMap ;
3038import java .util .List ;
3139import java .util .Map ;
3240
3341@ OpenSearchIntegTestCase .SuiteScopeTestCase
3442public class SearchPipelineCommonIT extends OpenSearchIntegTestCase {
3543
44+ private static final String TEST_INDEX = "myindex" ;
45+ private static final String PIPELINE_NAME = "test_pipeline" ;
46+
3647 @ Override
3748 protected Collection <Class <? extends Plugin >> nodePlugins () {
3849 return List .of (SearchPipelineCommonModulePlugin .class );
3950 }
4051
52+ @ Before
53+ public void setup () throws Exception {
54+ createIndex (TEST_INDEX );
55+
56+ IndexRequest doc1 = new IndexRequest (TEST_INDEX ).id ("doc1" ).source (Map .of ("field" , "value" ));
57+ IndexRequest doc2 = new IndexRequest (TEST_INDEX ).id ("doc2" ).source (Map .of ("field" , "something else" ));
58+
59+ IndexResponse ir = client ().index (doc1 ).actionGet ();
60+ assertSame (RestStatus .CREATED , ir .status ());
61+ ir = client ().index (doc2 ).actionGet ();
62+ assertSame (RestStatus .CREATED , ir .status ());
63+
64+ RefreshResponse refRsp = client ().admin ().indices ().refresh (new RefreshRequest (TEST_INDEX )).actionGet ();
65+ assertSame (RestStatus .OK , refRsp .getStatus ());
66+ }
67+
68+ @ After
69+ public void cleanup () throws Exception {
70+ internalCluster ().wipeIndices (TEST_INDEX );
71+ }
72+
4173 public void testFilterQuery () {
4274 // Create a pipeline with a filter_query processor.
43- String pipelineName = "foo" ;
75+ createPipeline ();
76+
77+ // Search without the pipeline. Should see both documents.
78+ SearchRequest req = new SearchRequest (TEST_INDEX ).source (new SearchSourceBuilder ().query (new MatchAllQueryBuilder ()));
79+ SearchResponse rsp = client ().search (req ).actionGet ();
80+ assertEquals (2 , rsp .getHits ().getTotalHits ().value );
81+
82+ // Search with the pipeline. Should only see document with "field":"value".
83+ req .pipeline (PIPELINE_NAME );
84+ rsp = client ().search (req ).actionGet ();
85+ assertEquals (1 , rsp .getHits ().getTotalHits ().value );
86+
87+ // Clean up.
88+ deletePipeline ();
89+ }
90+
91+ public void testSearchWithTemporaryPipeline () throws Exception {
92+
93+ // Search without the pipeline. Should see both documents.
94+ SearchRequest req = new SearchRequest (TEST_INDEX ).source (new SearchSourceBuilder ().query (new MatchAllQueryBuilder ()));
95+ SearchResponse rsp = client ().search (req ).actionGet ();
96+ assertEquals (2 , rsp .getHits ().getTotalHits ().value );
97+
98+ // Search with temporary pipeline
99+ Map <String , Object > pipelineSourceMap = new HashMap <>();
100+ Map <String , Object > requestProcessorConfig = new HashMap <>();
101+
102+ Map <String , Object > filterQuery = new HashMap <>();
103+ filterQuery .put ("query" , Map .of ("term" , Map .of ("field" , "value" )));
104+ requestProcessorConfig .put ("filter_query" , filterQuery );
105+ pipelineSourceMap .put ("request_processors" , List .of (requestProcessorConfig ));
106+
107+ req = new SearchRequest (TEST_INDEX ).source (
108+ new SearchSourceBuilder ().query (new MatchAllQueryBuilder ()).searchPipelineSource (pipelineSourceMap )
109+ );
110+
111+ SearchResponse rspWithTempPipeline = client ().search (req ).actionGet ();
112+ assertEquals (1 , rspWithTempPipeline .getHits ().getTotalHits ().value );
113+ }
114+
115+ public void testSearchWithDefaultPipeline () throws Exception {
116+ // Create pipeline
117+ createPipeline ();
118+
119+ // Search without the pipeline. Should see both documents.
120+ SearchRequest req = new SearchRequest (TEST_INDEX ).source (new SearchSourceBuilder ().query (new MatchAllQueryBuilder ()));
121+ SearchResponse rsp = client ().search (req ).actionGet ();
122+ assertEquals (2 , rsp .getHits ().getTotalHits ().value );
123+
124+ // Set pipeline as default for the index
125+ UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest (TEST_INDEX );
126+ updateSettingsRequest .settings (Settings .builder ().put ("index.search.default_pipeline" , PIPELINE_NAME ));
127+ AcknowledgedResponse updateSettingsResponse = client ().admin ().indices ().updateSettings (updateSettingsRequest ).actionGet ();
128+ assertTrue (updateSettingsResponse .isAcknowledged ());
129+
130+ // Search with the default pipeline. Should only see document with "field":"value".
131+ rsp = client ().search (req ).actionGet ();
132+ assertEquals (1 , rsp .getHits ().getTotalHits ().value );
133+
134+ // Clean up: Remove default pipeline setting
135+ updateSettingsRequest = new UpdateSettingsRequest (TEST_INDEX );
136+ updateSettingsRequest .settings (Settings .builder ().putNull ("index.search.default_pipeline" ));
137+ updateSettingsResponse = client ().admin ().indices ().updateSettings (updateSettingsRequest ).actionGet ();
138+ assertTrue (updateSettingsResponse .isAcknowledged ());
139+
140+ // Clean up.
141+ deletePipeline ();
142+ }
143+
144+ public void testUpdateSearchPipeline () throws Exception {
145+ // Create initial pipeline
146+ createPipeline ();
147+
148+ // Verify initial pipeline
149+ SearchRequest req = new SearchRequest (TEST_INDEX ).source (new SearchSourceBuilder ().query (new MatchAllQueryBuilder ()));
150+ req .pipeline (PIPELINE_NAME );
151+ SearchResponse initialRsp = client ().search (req ).actionGet ();
152+ assertEquals (1 , initialRsp .getHits ().getTotalHits ().value );
153+
154+ BytesReference pipelineConfig = new BytesArray (
155+ "{"
156+ + "\" description\" : \" Updated pipeline\" ,"
157+ + "\" request_processors\" : ["
158+ + "{"
159+ + "\" filter_query\" : {"
160+ + "\" query\" : {"
161+ + "\" term\" : {"
162+ + "\" field\" : \" something else\" "
163+ + "}"
164+ + "}"
165+ + "}"
166+ + "}"
167+ + "]"
168+ + "}"
169+ );
170+
171+ PipelineConfiguration pipeline = new PipelineConfiguration (PIPELINE_NAME , pipelineConfig , MediaTypeRegistry .JSON );
172+
173+ // Update pipeline
174+ PutSearchPipelineRequest updateRequest = new PutSearchPipelineRequest (pipeline .getId (), pipelineConfig , MediaTypeRegistry .JSON );
175+ AcknowledgedResponse ackRsp = client ().admin ().cluster ().putSearchPipeline (updateRequest ).actionGet ();
176+ assertTrue (ackRsp .isAcknowledged ());
177+
178+ // Verify pipeline description
179+ GetSearchPipelineResponse getPipelineResponse = client ().admin ()
180+ .cluster ()
181+ .getSearchPipeline (new GetSearchPipelineRequest (PIPELINE_NAME ))
182+ .actionGet ();
183+ assertEquals (PIPELINE_NAME , getPipelineResponse .pipelines ().get (0 ).getId ());
184+ assertEquals (pipeline .getConfigAsMap (), getPipelineResponse .pipelines ().get (0 ).getConfigAsMap ());
185+ // Clean up.
186+ deletePipeline ();
187+ }
188+
189+ private void createPipeline () {
44190 PutSearchPipelineRequest putSearchPipelineRequest = new PutSearchPipelineRequest (
45- pipelineName ,
191+ PIPELINE_NAME ,
46192 new BytesArray (
47193 "{"
48194 + "\" request_processors\" : ["
@@ -62,35 +208,13 @@ public void testFilterQuery() {
62208 );
63209 AcknowledgedResponse ackRsp = client ().admin ().cluster ().putSearchPipeline (putSearchPipelineRequest ).actionGet ();
64210 assertTrue (ackRsp .isAcknowledged ());
211+ }
65212
66- // Index some documents.
67- String indexName = "myindex" ;
68- IndexRequest doc1 = new IndexRequest (indexName ).id ("doc1" ).source (Map .of ("field" , "value" ));
69- IndexRequest doc2 = new IndexRequest (indexName ).id ("doc2" ).source (Map .of ("field" , "something else" ));
70-
71- IndexResponse ir = client ().index (doc1 ).actionGet ();
72- assertSame (RestStatus .CREATED , ir .status ());
73- ir = client ().index (doc2 ).actionGet ();
74- assertSame (RestStatus .CREATED , ir .status ());
75-
76- // Refresh so the documents are visible to search.
77- RefreshResponse refRsp = client ().admin ().indices ().refresh (new RefreshRequest (indexName )).actionGet ();
78- assertSame (RestStatus .OK , refRsp .getStatus ());
79-
80- // Search without the pipeline. Should see both documents.
81- SearchRequest req = new SearchRequest (indexName ).source (new SearchSourceBuilder ().query (new MatchAllQueryBuilder ()));
82- SearchResponse rsp = client ().search (req ).actionGet ();
83- assertEquals (2 , rsp .getHits ().getTotalHits ().value );
84-
85- // Search with the pipeline. Should only see document with "field":"value".
86- req .pipeline (pipelineName );
87- rsp = client ().search (req ).actionGet ();
88- assertEquals (1 , rsp .getHits ().getTotalHits ().value );
89-
90- // Clean up.
91- ackRsp = client ().admin ().cluster ().deleteSearchPipeline (new DeleteSearchPipelineRequest (pipelineName )).actionGet ();
92- assertTrue (ackRsp .isAcknowledged ());
93- ackRsp = client ().admin ().indices ().delete (new DeleteIndexRequest (indexName )).actionGet ();
213+ private void deletePipeline () {
214+ AcknowledgedResponse ackRsp = client ().admin ()
215+ .cluster ()
216+ .deleteSearchPipeline (new DeleteSearchPipelineRequest (PIPELINE_NAME ))
217+ .actionGet ();
94218 assertTrue (ackRsp .isAcknowledged ());
95219 }
96220}
0 commit comments