3232
3333package org .opensearch .action .update ;
3434
35+ import org .apache .logging .log4j .LogManager ;
36+ import org .apache .logging .log4j .Logger ;
37+ import org .opensearch .action .ActionRunnable ;
3538import org .opensearch .action .RoutingMissingException ;
3639import org .opensearch .action .bulk .BulkItemResponse ;
3740import org .opensearch .action .bulk .BulkRequest ;
3841import org .opensearch .action .bulk .TransportBulkAction ;
42+ import org .opensearch .action .delete .DeleteRequest ;
43+ import org .opensearch .action .delete .DeleteResponse ;
44+ import org .opensearch .action .index .IndexRequest ;
45+ import org .opensearch .action .index .IndexResponse ;
3946import org .opensearch .action .support .ActionFilters ;
4047import org .opensearch .action .support .HandledTransportAction ;
4148import org .opensearch .action .support .WriteRequest ;
4249import org .opensearch .cluster .metadata .Metadata ;
50+ import org .opensearch .common .collect .Tuple ;
4351import org .opensearch .common .inject .Inject ;
52+ import org .opensearch .common .logging .DeprecationLogger ;
53+ import org .opensearch .common .settings .Settings ;
54+ import org .opensearch .common .util .concurrent .AbstractRunnable ;
55+ import org .opensearch .common .xcontent .XContentHelper ;
4456import org .opensearch .core .action .ActionListener ;
57+ import org .opensearch .core .common .bytes .BytesReference ;
58+ import org .opensearch .core .common .io .stream .NotSerializableExceptionWrapper ;
59+ import org .opensearch .core .index .shard .ShardId ;
60+ import org .opensearch .core .rest .RestStatus ;
61+ import org .opensearch .core .xcontent .MediaType ;
62+ import org .opensearch .index .IndexService ;
63+ import org .opensearch .index .IndexSettings ;
64+ import org .opensearch .index .engine .VersionConflictEngineException ;
65+ import org .opensearch .index .shard .IndexShard ;
66+ import org .opensearch .index .shard .IndexingStats ;
67+ import org .opensearch .indices .IndicesService ;
4568import org .opensearch .tasks .Task ;
69+ import org .opensearch .threadpool .ThreadPool ;
70+ import org .opensearch .transport .TransportChannel ;
71+ import org .opensearch .transport .TransportRequestHandler ;
4672import org .opensearch .transport .TransportService ;
73+ import org .opensearch .transport .client .node .NodeClient ;
74+
75+ import java .util .Map ;
76+
77+ import static org .opensearch .ExceptionsHelper .unwrapCause ;
78+ import static org .opensearch .action .bulk .TransportSingleItemBulkWriteAction .toSingleItemBulkRequest ;
79+ import static org .opensearch .action .bulk .TransportSingleItemBulkWriteAction .wrapBulkResponse ;
4780
4881/**
4982 * Performs the update operation by delegating to {@link TransportBulkAction} with a single update operation.
@@ -54,10 +87,32 @@ public class TransportUpdateAction extends HandledTransportAction<UpdateRequest,
5487
5588 private final TransportBulkAction bulkAction ;
5689
90+ // The following fields can be removed once we remove ShardTransportHandler.
91+ private static final Logger logger = LogManager .getLogger (TransportUpdateAction .class );
92+ private static final DeprecationLogger deprecationLogger = DeprecationLogger .getLogger (TransportUpdateAction .class );
93+ private static final String SHARD_ACTION_NAME = UpdateAction .NAME + "[s]" ;
94+ private final ThreadPool threadPool ;
95+ private final IndicesService indicesService ;
96+ private final UpdateHelper updateHelper ;
97+ private final NodeClient client ;
98+
5799 @ Inject
58- public TransportUpdateAction (TransportService transportService , ActionFilters actionFilters , TransportBulkAction bulkAction ) {
100+ public TransportUpdateAction (
101+ ThreadPool threadPool ,
102+ TransportService transportService ,
103+ ActionFilters actionFilters ,
104+ TransportBulkAction bulkAction ,
105+ IndicesService indicesService ,
106+ UpdateHelper updateHelper ,
107+ NodeClient client
108+ ) {
59109 super (UpdateAction .NAME , transportService , actionFilters , UpdateRequest ::new );
110+ this .threadPool = threadPool ;
60111 this .bulkAction = bulkAction ;
112+ this .indicesService = indicesService ;
113+ this .updateHelper = updateHelper ;
114+ this .client = client ;
115+ transportService .registerRequestHandler (SHARD_ACTION_NAME , ThreadPool .Names .SAME , UpdateRequest ::new , new ShardTransportHandler ());
61116 }
62117
63118 public static void resolveAndValidateRouting (Metadata metadata , String concreteIndex , UpdateRequest request ) {
@@ -88,4 +143,198 @@ protected void doExecute(Task task, UpdateRequest request, ActionListener<Update
88143 }
89144 }, listener ::onFailure ));
90145 }
146+
147+ /**
148+ * Transport handler per shard.
149+ *
150+ * @deprecated This only exists for BWC with 2.x. We can remove this when we release OpenSearch 4.0.
151+ * @opensearch.internal
152+ */
153+ private class ShardTransportHandler implements TransportRequestHandler <UpdateRequest > {
154+
155+ protected String executor (ShardId shardId ) {
156+ final IndexService indexService = indicesService .indexServiceSafe (shardId .getIndex ());
157+ return indexService .getIndexSettings ().getIndexMetadata ().isSystem () ? ThreadPool .Names .SYSTEM_WRITE : ThreadPool .Names .WRITE ;
158+ }
159+
160+ protected void shardOperation (final UpdateRequest request , final ActionListener <UpdateResponse > listener , final int retryCount ) {
161+ final ShardId shardId = request .getShardId ();
162+ final IndexService indexService = indicesService .indexServiceSafe (shardId .getIndex ());
163+ final IndexShard indexShard = indexService .getShard (shardId .getId ());
164+ final UpdateHelper .Result result = updateHelper .prepare (request , indexShard , threadPool ::absoluteTimeInMillis );
165+ switch (result .getResponseResult ()) {
166+ case CREATED :
167+ IndexRequest upsertRequest = result .action ();
168+ // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
169+ final BytesReference upsertSourceBytes = upsertRequest .source ();
170+ client .bulk (toSingleItemBulkRequest (upsertRequest ), wrapBulkResponse (ActionListener .<IndexResponse >wrap (response -> {
171+ UpdateResponse update = new UpdateResponse (
172+ response .getShardInfo (),
173+ response .getShardId (),
174+ response .getId (),
175+ response .getSeqNo (),
176+ response .getPrimaryTerm (),
177+ response .getVersion (),
178+ response .getResult ()
179+ );
180+ if (request .fetchSource () != null && request .fetchSource ().fetchSource ()) {
181+ Tuple <? extends MediaType , Map <String , Object >> sourceAndContent = XContentHelper .convertToMap (
182+ upsertSourceBytes ,
183+ true ,
184+ upsertRequest .getContentType ()
185+ );
186+ update .setGetResult (
187+ UpdateHelper .extractGetResult (
188+ request ,
189+ request .concreteIndex (),
190+ response .getSeqNo (),
191+ response .getPrimaryTerm (),
192+ response .getVersion (),
193+ sourceAndContent .v2 (),
194+ sourceAndContent .v1 (),
195+ upsertSourceBytes
196+ )
197+ );
198+ } else {
199+ update .setGetResult (null );
200+ }
201+ update .setForcedRefresh (response .forcedRefresh ());
202+ listener .onResponse (update );
203+ }, exception -> handleUpdateFailureWithRetry (listener , request , exception , retryCount ))));
204+
205+ break ;
206+ case UPDATED :
207+ IndexRequest indexRequest = result .action ();
208+ // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
209+ final BytesReference indexSourceBytes = indexRequest .source ();
210+ final Settings indexSettings = indexService .getIndexSettings ().getSettings ();
211+ if (IndexSettings .DEFAULT_PIPELINE .exists (indexSettings ) || IndexSettings .FINAL_PIPELINE .exists (indexSettings )) {
212+ deprecationLogger .deprecate (
213+ "update_operation_with_ingest_pipeline" ,
214+ "the index ["
215+ + indexRequest .index ()
216+ + "] has a default ingest pipeline or a final ingest pipeline, the support of the ingest pipelines for update operation causes unexpected result and will be removed in 3.0.0"
217+ );
218+ }
219+ client .bulk (toSingleItemBulkRequest (indexRequest ), wrapBulkResponse (ActionListener .<IndexResponse >wrap (response -> {
220+ UpdateResponse update = new UpdateResponse (
221+ response .getShardInfo (),
222+ response .getShardId (),
223+ response .getId (),
224+ response .getSeqNo (),
225+ response .getPrimaryTerm (),
226+ response .getVersion (),
227+ response .getResult ()
228+ );
229+ update .setGetResult (
230+ UpdateHelper .extractGetResult (
231+ request ,
232+ request .concreteIndex (),
233+ response .getSeqNo (),
234+ response .getPrimaryTerm (),
235+ response .getVersion (),
236+ result .updatedSourceAsMap (),
237+ result .updateSourceContentType (),
238+ indexSourceBytes
239+ )
240+ );
241+ update .setForcedRefresh (response .forcedRefresh ());
242+ listener .onResponse (update );
243+ }, exception -> handleUpdateFailureWithRetry (listener , request , exception , retryCount ))));
244+ break ;
245+ case DELETED :
246+ DeleteRequest deleteRequest = result .action ();
247+ client .bulk (toSingleItemBulkRequest (deleteRequest ), wrapBulkResponse (ActionListener .<DeleteResponse >wrap (response -> {
248+ UpdateResponse update = new UpdateResponse (
249+ response .getShardInfo (),
250+ response .getShardId (),
251+ response .getId (),
252+ response .getSeqNo (),
253+ response .getPrimaryTerm (),
254+ response .getVersion (),
255+ response .getResult ()
256+ );
257+ update .setGetResult (
258+ UpdateHelper .extractGetResult (
259+ request ,
260+ request .concreteIndex (),
261+ response .getSeqNo (),
262+ response .getPrimaryTerm (),
263+ response .getVersion (),
264+ result .updatedSourceAsMap (),
265+ result .updateSourceContentType (),
266+ null
267+ )
268+ );
269+ update .setForcedRefresh (response .forcedRefresh ());
270+ listener .onResponse (update );
271+ }, exception -> handleUpdateFailureWithRetry (listener , request , exception , retryCount ))));
272+ break ;
273+ case NOOP :
274+ UpdateResponse update = result .action ();
275+ IndexService indexServiceOrNull = indicesService .indexService (shardId .getIndex ());
276+ if (indexServiceOrNull != null ) {
277+ IndexShard shard = indexService .getShardOrNull (shardId .getId ());
278+ if (shard != null ) {
279+ shard .noopUpdate ();
280+ }
281+ }
282+
283+ IndexingStats .Stats .DocStatusStats stats = new IndexingStats .Stats .DocStatusStats ();
284+ stats .inc (RestStatus .OK );
285+
286+ indicesService .addDocStatusStats (stats );
287+ listener .onResponse (update );
288+
289+ break ;
290+ default :
291+ throw new IllegalStateException ("Illegal result " + result .getResponseResult ());
292+ }
293+ }
294+
295+ private void handleUpdateFailureWithRetry (
296+ final ActionListener <UpdateResponse > listener ,
297+ final UpdateRequest request ,
298+ final Exception failure ,
299+ int retryCount
300+ ) {
301+ final Throwable cause = unwrapCause (failure );
302+ if (cause instanceof VersionConflictEngineException ) {
303+ if (retryCount < request .retryOnConflict ()) {
304+ logger .trace (
305+ "Retry attempt [{}] of [{}] on version conflict on [{}][{}][{}]" ,
306+ retryCount + 1 ,
307+ request .retryOnConflict (),
308+ request .index (),
309+ request .getShardId (),
310+ request .id ()
311+ );
312+ threadPool .executor (executor (request .getShardId ()))
313+ .execute (ActionRunnable .wrap (listener , l -> shardOperation (request , l , retryCount + 1 )));
314+ return ;
315+ }
316+ }
317+ listener .onFailure (cause instanceof Exception ? (Exception ) cause : new NotSerializableExceptionWrapper (cause ));
318+ }
319+
320+ @ Override
321+ public void messageReceived (final UpdateRequest request , final TransportChannel channel , Task task ) throws Exception {
322+ threadPool .executor (executor (request .shardId ())).execute (new AbstractRunnable () {
323+ @ Override
324+ public void onFailure (Exception e ) {
325+ try {
326+ channel .sendResponse (e );
327+ } catch (Exception inner ) {
328+ inner .addSuppressed (e );
329+ logger .warn ("failed to send response for " + SHARD_ACTION_NAME , inner );
330+ }
331+ }
332+
333+ @ Override
334+ protected void doRun () {
335+ shardOperation (request , ActionListener .wrap (channel ::sendResponse , this ::onFailure ), 0 );
336+ }
337+ });
338+ }
339+ }
91340}
0 commit comments