3232
3333package org .opensearch .action .update ;
3434
35+ import org .apache .logging .log4j .LogManager ;
36+ import org .apache .logging .log4j .Logger ;
37+ import org .opensearch .action .ActionRunnable ;
3538import org .opensearch .action .RoutingMissingException ;
3639import org .opensearch .action .bulk .BulkItemResponse ;
3740import org .opensearch .action .bulk .BulkRequest ;
3841import org .opensearch .action .bulk .TransportBulkAction ;
42+ import org .opensearch .action .delete .DeleteRequest ;
43+ import org .opensearch .action .delete .DeleteResponse ;
44+ import org .opensearch .action .index .IndexRequest ;
45+ import org .opensearch .action .index .IndexResponse ;
3946import org .opensearch .action .support .ActionFilters ;
4047import org .opensearch .action .support .HandledTransportAction ;
4148import org .opensearch .action .support .WriteRequest ;
4249import org .opensearch .cluster .metadata .Metadata ;
50+ import org .opensearch .common .collect .Tuple ;
4351import org .opensearch .common .inject .Inject ;
52+ import org .opensearch .common .logging .DeprecationLogger ;
53+ import org .opensearch .common .settings .Settings ;
54+ import org .opensearch .common .util .concurrent .AbstractRunnable ;
55+ import org .opensearch .common .xcontent .XContentHelper ;
4456import org .opensearch .core .action .ActionListener ;
57+ import org .opensearch .core .common .bytes .BytesReference ;
58+ import org .opensearch .core .common .io .stream .NotSerializableExceptionWrapper ;
59+ import org .opensearch .core .index .shard .ShardId ;
60+ import org .opensearch .core .rest .RestStatus ;
61+ import org .opensearch .core .xcontent .MediaType ;
62+ import org .opensearch .index .IndexService ;
63+ import org .opensearch .index .IndexSettings ;
64+ import org .opensearch .index .engine .VersionConflictEngineException ;
65+ import org .opensearch .index .shard .IndexShard ;
66+ import org .opensearch .index .shard .IndexingStats ;
67+ import org .opensearch .indices .IndicesService ;
4568import org .opensearch .tasks .Task ;
69+ import org .opensearch .threadpool .ThreadPool ;
70+ import org .opensearch .transport .TransportChannel ;
71+ import org .opensearch .transport .TransportRequestHandler ;
4672import org .opensearch .transport .TransportService ;
73+ import org .opensearch .transport .client .node .NodeClient ;
74+
75+ import java .util .Map ;
76+
77+ import static org .opensearch .ExceptionsHelper .unwrapCause ;
78+ import static org .opensearch .action .bulk .TransportSingleItemBulkWriteAction .toSingleItemBulkRequest ;
79+ import static org .opensearch .action .bulk .TransportSingleItemBulkWriteAction .wrapBulkResponse ;
4780
4881/**
4982 * Performs the update operation by delegating to {@link TransportBulkAction} with a single update operation.
@@ -54,10 +87,32 @@ public class TransportUpdateAction extends HandledTransportAction<UpdateRequest,
5487
5588 private final TransportBulkAction bulkAction ;
5689
90+ // The following fields can be removed once we remove ShardTransportHandler.
91+ private static final Logger logger = LogManager .getLogger (TransportUpdateAction .class );
92+ private static final DeprecationLogger deprecationLogger = DeprecationLogger .getLogger (TransportUpdateAction .class );
93+ private static final String SHARD_ACTION_NAME = UpdateAction .NAME + "[s]" ;
94+ private final ThreadPool threadPool ;
95+ private final IndicesService indicesService ;
96+ private final UpdateHelper updateHelper ;
97+ private final NodeClient client ;
98+
5799 @ Inject
58- public TransportUpdateAction (TransportService transportService , ActionFilters actionFilters , TransportBulkAction bulkAction ) {
100+ public TransportUpdateAction (
101+ ThreadPool threadPool ,
102+ TransportService transportService ,
103+ ActionFilters actionFilters ,
104+ TransportBulkAction bulkAction ,
105+ IndicesService indicesService ,
106+ UpdateHelper updateHelper ,
107+ NodeClient client
108+ ) {
59109 super (UpdateAction .NAME , transportService , actionFilters , UpdateRequest ::new );
110+ this .threadPool = threadPool ;
60111 this .bulkAction = bulkAction ;
112+ this .indicesService = indicesService ;
113+ this .updateHelper = updateHelper ;
114+ this .client = client ;
115+ transportService .registerRequestHandler (SHARD_ACTION_NAME , ThreadPool .Names .SAME , UpdateRequest ::new , new ShardTransportHandler ());
61116 }
62117
63118 public static void resolveAndValidateRouting (Metadata metadata , String concreteIndex , UpdateRequest request ) {
@@ -88,4 +143,199 @@ protected void doExecute(Task task, UpdateRequest request, ActionListener<Update
88143 }
89144 }, listener ::onFailure ));
90145 }
146+
147+ /**
148+ * Transport handler per shard.
149+ *
150+ * @deprecated This only exists for BWC with 2.x. We can remove this when we release OpenSearch 4.0.
151+ * @opensearch.internal
152+ */
153+ @ Deprecated (forRemoval = true , since = "3.0" )
154+ private class ShardTransportHandler implements TransportRequestHandler <UpdateRequest > {
155+
156+ protected String executor (ShardId shardId ) {
157+ final IndexService indexService = indicesService .indexServiceSafe (shardId .getIndex ());
158+ return indexService .getIndexSettings ().getIndexMetadata ().isSystem () ? ThreadPool .Names .SYSTEM_WRITE : ThreadPool .Names .WRITE ;
159+ }
160+
161+ protected void shardOperation (final UpdateRequest request , final ActionListener <UpdateResponse > listener , final int retryCount ) {
162+ final ShardId shardId = request .getShardId ();
163+ final IndexService indexService = indicesService .indexServiceSafe (shardId .getIndex ());
164+ final IndexShard indexShard = indexService .getShard (shardId .getId ());
165+ final UpdateHelper .Result result = updateHelper .prepare (request , indexShard , threadPool ::absoluteTimeInMillis );
166+ switch (result .getResponseResult ()) {
167+ case CREATED :
168+ IndexRequest upsertRequest = result .action ();
169+ // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
170+ final BytesReference upsertSourceBytes = upsertRequest .source ();
171+ client .bulk (toSingleItemBulkRequest (upsertRequest ), wrapBulkResponse (ActionListener .<IndexResponse >wrap (response -> {
172+ UpdateResponse update = new UpdateResponse (
173+ response .getShardInfo (),
174+ response .getShardId (),
175+ response .getId (),
176+ response .getSeqNo (),
177+ response .getPrimaryTerm (),
178+ response .getVersion (),
179+ response .getResult ()
180+ );
181+ if (request .fetchSource () != null && request .fetchSource ().fetchSource ()) {
182+ Tuple <? extends MediaType , Map <String , Object >> sourceAndContent = XContentHelper .convertToMap (
183+ upsertSourceBytes ,
184+ true ,
185+ upsertRequest .getContentType ()
186+ );
187+ update .setGetResult (
188+ UpdateHelper .extractGetResult (
189+ request ,
190+ request .concreteIndex (),
191+ response .getSeqNo (),
192+ response .getPrimaryTerm (),
193+ response .getVersion (),
194+ sourceAndContent .v2 (),
195+ sourceAndContent .v1 (),
196+ upsertSourceBytes
197+ )
198+ );
199+ } else {
200+ update .setGetResult (null );
201+ }
202+ update .setForcedRefresh (response .forcedRefresh ());
203+ listener .onResponse (update );
204+ }, exception -> handleUpdateFailureWithRetry (listener , request , exception , retryCount ))));
205+
206+ break ;
207+ case UPDATED :
208+ IndexRequest indexRequest = result .action ();
209+ // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
210+ final BytesReference indexSourceBytes = indexRequest .source ();
211+ final Settings indexSettings = indexService .getIndexSettings ().getSettings ();
212+ if (IndexSettings .DEFAULT_PIPELINE .exists (indexSettings ) || IndexSettings .FINAL_PIPELINE .exists (indexSettings )) {
213+ deprecationLogger .deprecate (
214+ "update_operation_with_ingest_pipeline" ,
215+ "the index ["
216+ + indexRequest .index ()
217+ + "] has a default ingest pipeline or a final ingest pipeline, the support of the ingest pipelines for update operation causes unexpected result and will be removed in 3.0.0"
218+ );
219+ }
220+ client .bulk (toSingleItemBulkRequest (indexRequest ), wrapBulkResponse (ActionListener .<IndexResponse >wrap (response -> {
221+ UpdateResponse update = new UpdateResponse (
222+ response .getShardInfo (),
223+ response .getShardId (),
224+ response .getId (),
225+ response .getSeqNo (),
226+ response .getPrimaryTerm (),
227+ response .getVersion (),
228+ response .getResult ()
229+ );
230+ update .setGetResult (
231+ UpdateHelper .extractGetResult (
232+ request ,
233+ request .concreteIndex (),
234+ response .getSeqNo (),
235+ response .getPrimaryTerm (),
236+ response .getVersion (),
237+ result .updatedSourceAsMap (),
238+ result .updateSourceContentType (),
239+ indexSourceBytes
240+ )
241+ );
242+ update .setForcedRefresh (response .forcedRefresh ());
243+ listener .onResponse (update );
244+ }, exception -> handleUpdateFailureWithRetry (listener , request , exception , retryCount ))));
245+ break ;
246+ case DELETED :
247+ DeleteRequest deleteRequest = result .action ();
248+ client .bulk (toSingleItemBulkRequest (deleteRequest ), wrapBulkResponse (ActionListener .<DeleteResponse >wrap (response -> {
249+ UpdateResponse update = new UpdateResponse (
250+ response .getShardInfo (),
251+ response .getShardId (),
252+ response .getId (),
253+ response .getSeqNo (),
254+ response .getPrimaryTerm (),
255+ response .getVersion (),
256+ response .getResult ()
257+ );
258+ update .setGetResult (
259+ UpdateHelper .extractGetResult (
260+ request ,
261+ request .concreteIndex (),
262+ response .getSeqNo (),
263+ response .getPrimaryTerm (),
264+ response .getVersion (),
265+ result .updatedSourceAsMap (),
266+ result .updateSourceContentType (),
267+ null
268+ )
269+ );
270+ update .setForcedRefresh (response .forcedRefresh ());
271+ listener .onResponse (update );
272+ }, exception -> handleUpdateFailureWithRetry (listener , request , exception , retryCount ))));
273+ break ;
274+ case NOOP :
275+ UpdateResponse update = result .action ();
276+ IndexService indexServiceOrNull = indicesService .indexService (shardId .getIndex ());
277+ if (indexServiceOrNull != null ) {
278+ IndexShard shard = indexService .getShardOrNull (shardId .getId ());
279+ if (shard != null ) {
280+ shard .noopUpdate ();
281+ }
282+ }
283+
284+ IndexingStats .Stats .DocStatusStats stats = new IndexingStats .Stats .DocStatusStats ();
285+ stats .inc (RestStatus .OK );
286+
287+ indicesService .addDocStatusStats (stats );
288+ listener .onResponse (update );
289+
290+ break ;
291+ default :
292+ throw new IllegalStateException ("Illegal result " + result .getResponseResult ());
293+ }
294+ }
295+
296+ private void handleUpdateFailureWithRetry (
297+ final ActionListener <UpdateResponse > listener ,
298+ final UpdateRequest request ,
299+ final Exception failure ,
300+ int retryCount
301+ ) {
302+ final Throwable cause = unwrapCause (failure );
303+ if (cause instanceof VersionConflictEngineException ) {
304+ if (retryCount < request .retryOnConflict ()) {
305+ logger .trace (
306+ "Retry attempt [{}] of [{}] on version conflict on [{}][{}][{}]" ,
307+ retryCount + 1 ,
308+ request .retryOnConflict (),
309+ request .index (),
310+ request .getShardId (),
311+ request .id ()
312+ );
313+ threadPool .executor (executor (request .getShardId ()))
314+ .execute (ActionRunnable .wrap (listener , l -> shardOperation (request , l , retryCount + 1 )));
315+ return ;
316+ }
317+ }
318+ listener .onFailure (cause instanceof Exception ? (Exception ) cause : new NotSerializableExceptionWrapper (cause ));
319+ }
320+
321+ @ Override
322+ public void messageReceived (final UpdateRequest request , final TransportChannel channel , Task task ) throws Exception {
323+ threadPool .executor (executor (request .shardId ())).execute (new AbstractRunnable () {
324+ @ Override
325+ public void onFailure (Exception e ) {
326+ try {
327+ channel .sendResponse (e );
328+ } catch (Exception inner ) {
329+ inner .addSuppressed (e );
330+ logger .warn ("failed to send response for " + SHARD_ACTION_NAME , inner );
331+ }
332+ }
333+
334+ @ Override
335+ protected void doRun () {
336+ shardOperation (request , ActionListener .wrap (channel ::sendResponse , this ::onFailure ), 0 );
337+ }
338+ });
339+ }
340+ }
91341}
0 commit comments