Skip to content

Commit 4538201

Browse files
committed
chore: Rewirte lambda in McpAsyncServer with local methods.
Signed-off-by: He-Pin <[email protected]>
1 parent 7163ec6 commit 4538201

File tree

1 file changed

+137
-139
lines changed

1 file changed

+137
-139
lines changed

mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java

Lines changed: 137 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ public class McpAsyncServer {
158158
private Map<String, McpNotificationHandler> prepareNotificationHandlers(McpServerFeatures.Async features) {
159159
Map<String, McpNotificationHandler> notificationHandlers = new HashMap<>();
160160

161-
notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_INITIALIZED, (exchange, params) -> Mono.empty());
161+
notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_INITIALIZED, this::handleNotificationInitialized);
162162

163163
List<BiFunction<McpAsyncServerExchange, List<McpSchema.Root>, Mono<Void>>> rootsChangeConsumers = features
164164
.rootsChangeConsumers();
@@ -173,45 +173,58 @@ private Map<String, McpNotificationHandler> prepareNotificationHandlers(McpServe
173173
return notificationHandlers;
174174
}
175175

176-
private Map<String, McpRequestHandler<?>> prepareRequestHandlers() {
177-
Map<String, McpRequestHandler<?>> requestHandlers = new HashMap<>();
176+
protected Mono<Void> handleNotificationInitialized(final McpAsyncServerExchange exchange, final Object params) {
177+
return Mono.just(Map.of()).then();
178+
}
178179

180+
@SuppressWarnings("rawtypes")
181+
private Map<String, McpRequestHandler<?>> prepareRequestHandlers() {
182+
final Map<String, McpRequestHandler<?>> requestHandlers = new HashMap<>();
179183
// Initialize request handlers for standard MCP methods
180184

181185
// Ping MUST respond with an empty data, but not NULL response.
182-
requestHandlers.put(McpSchema.METHOD_PING, (exchange, params) -> Mono.just(Map.of()));
186+
requestHandlers.put(McpSchema.METHOD_PING, (McpRequestHandler) this::handlePing);
183187

184188
// Add tools API handlers if the tool capability is enabled
185189
if (this.serverCapabilities.tools() != null) {
186-
requestHandlers.put(McpSchema.METHOD_TOOLS_LIST, toolsListRequestHandler());
187-
requestHandlers.put(McpSchema.METHOD_TOOLS_CALL, toolsCallRequestHandler());
190+
requestHandlers.put(McpSchema.METHOD_TOOLS_LIST, (McpRequestHandler) this::handleListTools);
191+
requestHandlers.put(McpSchema.METHOD_TOOLS_CALL, (McpRequestHandler) this::handleToolCall);
188192
}
189193

190194
// Add resources API handlers if provided
191195
if (this.serverCapabilities.resources() != null) {
192-
requestHandlers.put(McpSchema.METHOD_RESOURCES_LIST, resourcesListRequestHandler());
193-
requestHandlers.put(McpSchema.METHOD_RESOURCES_READ, resourcesReadRequestHandler());
194-
requestHandlers.put(McpSchema.METHOD_RESOURCES_TEMPLATES_LIST, resourceTemplateListRequestHandler());
196+
requestHandlers.put(McpSchema.METHOD_RESOURCES_LIST, (McpRequestHandler) this::handleListResources);
197+
requestHandlers.put(McpSchema.METHOD_RESOURCES_READ, (McpRequestHandler) this::handleReadResources);
198+
requestHandlers.put(McpSchema.METHOD_RESOURCES_TEMPLATES_LIST,
199+
(McpRequestHandler) this::handleListResourceTemplates);
195200
}
196201

197202
// Add prompts API handlers if provider exists
198203
if (this.serverCapabilities.prompts() != null) {
199-
requestHandlers.put(McpSchema.METHOD_PROMPT_LIST, promptsListRequestHandler());
200-
requestHandlers.put(McpSchema.METHOD_PROMPT_GET, promptsGetRequestHandler());
204+
requestHandlers.put(McpSchema.METHOD_PROMPT_LIST, (McpRequestHandler) this::handleListPrompts);
205+
requestHandlers.put(McpSchema.METHOD_PROMPT_GET, (McpRequestHandler) this::handleGetPrompt);
201206
}
202207

203208
// Add logging API handlers if the logging capability is enabled
204209
if (this.serverCapabilities.logging() != null) {
205-
requestHandlers.put(McpSchema.METHOD_LOGGING_SET_LEVEL, setLoggerRequestHandler());
210+
requestHandlers.put(McpSchema.METHOD_LOGGING_SET_LEVEL, (McpRequestHandler) this::handleSetLogger);
206211
}
207212

208213
// Add completion API handlers if the completion capability is enabled
209214
if (this.serverCapabilities.completions() != null) {
210-
requestHandlers.put(McpSchema.METHOD_COMPLETION_COMPLETE, completionCompleteRequestHandler());
215+
requestHandlers.put(McpSchema.METHOD_COMPLETION_COMPLETE,
216+
(McpRequestHandler) this::handleCompletionComplete);
211217
}
212218
return requestHandlers;
213219
}
214220

221+
/**
222+
* Handle the ping request.
223+
*/
224+
protected Mono<Map<String, Object>> handlePing(McpAsyncServerExchange exchange, Object params) {
225+
return Mono.just(Map.of());
226+
}
227+
215228
// ---------------------------------------
216229
// Lifecycle Management
217230
// ---------------------------------------
@@ -472,31 +485,27 @@ public Mono<Void> notifyToolsListChanged() {
472485
return this.mcpTransportProvider.notifyClients(McpSchema.METHOD_NOTIFICATION_TOOLS_LIST_CHANGED, null);
473486
}
474487

475-
private McpRequestHandler<McpSchema.ListToolsResult> toolsListRequestHandler() {
476-
return (exchange, params) -> {
477-
List<Tool> tools = this.tools.stream().map(McpServerFeatures.AsyncToolSpecification::tool).toList();
488+
protected Mono<ListToolsResult> handleListTools(final McpAsyncServerExchange exchange, final Object params) {
489+
List<Tool> tools = this.tools.stream().map(McpServerFeatures.AsyncToolSpecification::tool).toList();
478490

479-
return Mono.just(new McpSchema.ListToolsResult(tools, null));
480-
};
491+
return Mono.just(new McpSchema.ListToolsResult(tools, null));
481492
}
482493

483-
private McpRequestHandler<CallToolResult> toolsCallRequestHandler() {
484-
return (exchange, params) -> {
485-
McpSchema.CallToolRequest callToolRequest = objectMapper.convertValue(params,
486-
new TypeReference<McpSchema.CallToolRequest>() {
487-
});
494+
protected Mono<CallToolResult> handleToolCall(final McpAsyncServerExchange exchange, final Object params) {
495+
McpSchema.CallToolRequest callToolRequest = objectMapper.convertValue(params,
496+
new TypeReference<McpSchema.CallToolRequest>() {
497+
});
488498

489-
Optional<McpServerFeatures.AsyncToolSpecification> toolSpecification = this.tools.stream()
490-
.filter(tr -> callToolRequest.name().equals(tr.tool().name()))
491-
.findAny();
499+
Optional<McpServerFeatures.AsyncToolSpecification> toolSpecification = this.tools.stream()
500+
.filter(tr -> callToolRequest.name().equals(tr.tool().name()))
501+
.findAny();
492502

493-
if (toolSpecification.isEmpty()) {
494-
return Mono.error(new McpError(new JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INVALID_PARAMS,
495-
"Unknown tool: invalid_tool_name", "Tool not found: " + callToolRequest.name())));
496-
}
503+
if (toolSpecification.isEmpty()) {
504+
return Mono.error(new McpError(new JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INVALID_PARAMS,
505+
"Unknown tool: invalid_tool_name", "Tool not found: " + callToolRequest.name())));
506+
}
497507

498-
return toolSpecification.get().callHandler().apply(exchange, callToolRequest);
499-
};
508+
return toolSpecification.get().callHandler().apply(exchange, callToolRequest);
500509
}
501510

502511
// ---------------------------------------
@@ -573,21 +582,19 @@ public Mono<Void> notifyResourcesUpdated(McpSchema.ResourcesUpdatedNotification
573582
resourcesUpdatedNotification);
574583
}
575584

576-
private McpRequestHandler<McpSchema.ListResourcesResult> resourcesListRequestHandler() {
577-
return (exchange, params) -> {
578-
var resourceList = this.resources.values()
579-
.stream()
580-
.map(McpServerFeatures.AsyncResourceSpecification::resource)
581-
.filter(resource -> !resource.uri().contains("{"))
582-
.toList();
583-
return Mono.just(new McpSchema.ListResourcesResult(resourceList, null));
584-
};
585+
protected Mono<ListResourcesResult> handleListResources(final McpAsyncServerExchange exchange,
586+
final Object params) {
587+
var resourceList = this.resources.values()
588+
.stream()
589+
.map(McpServerFeatures.AsyncResourceSpecification::resource)
590+
.filter(resource -> !resource.uri().contains("{"))
591+
.toList();
592+
return Mono.just(new McpSchema.ListResourcesResult(resourceList, null));
585593
}
586594

587-
private McpRequestHandler<McpSchema.ListResourceTemplatesResult> resourceTemplateListRequestHandler() {
588-
return (exchange, params) -> Mono
589-
.just(new McpSchema.ListResourceTemplatesResult(this.getResourceTemplates(), null));
590-
595+
protected Mono<ListResourceTemplatesResult> handleListResourceTemplates(final McpAsyncServerExchange exchange,
596+
final Object params) {
597+
return Mono.just(new McpSchema.ListResourceTemplatesResult(this.getResourceTemplates(), null));
591598
}
592599

593600
private List<McpSchema.ResourceTemplate> getResourceTemplates() {
@@ -608,23 +615,21 @@ private List<McpSchema.ResourceTemplate> getResourceTemplates() {
608615
return list;
609616
}
610617

611-
private McpRequestHandler<McpSchema.ReadResourceResult> resourcesReadRequestHandler() {
612-
return (exchange, params) -> {
613-
McpSchema.ReadResourceRequest resourceRequest = objectMapper.convertValue(params,
614-
new TypeReference<McpSchema.ReadResourceRequest>() {
615-
});
616-
var resourceUri = resourceRequest.uri();
618+
protected Mono<ReadResourceResult> handleReadResources(final McpAsyncServerExchange exchange, final Object params) {
619+
McpSchema.ReadResourceRequest resourceRequest = objectMapper.convertValue(params,
620+
new TypeReference<McpSchema.ReadResourceRequest>() {
621+
});
622+
var resourceUri = resourceRequest.uri();
617623

618-
McpServerFeatures.AsyncResourceSpecification specification = this.resources.values()
619-
.stream()
620-
.filter(resourceSpecification -> this.uriTemplateManagerFactory
621-
.create(resourceSpecification.resource().uri())
622-
.matches(resourceUri))
623-
.findFirst()
624-
.orElseThrow(() -> new McpError("Resource not found: " + resourceUri));
624+
McpServerFeatures.AsyncResourceSpecification specification = this.resources.values()
625+
.stream()
626+
.filter(resourceSpecification -> this.uriTemplateManagerFactory
627+
.create(resourceSpecification.resource().uri())
628+
.matches(resourceUri))
629+
.findFirst()
630+
.orElseThrow(() -> new McpError("Resource not found: " + resourceUri));
625631

626-
return Mono.defer(() -> specification.readHandler().apply(exchange, resourceRequest));
627-
};
632+
return Mono.defer(() -> specification.readHandler().apply(exchange, resourceRequest));
628633
}
629634

630635
// ---------------------------------------
@@ -701,36 +706,32 @@ public Mono<Void> notifyPromptsListChanged() {
701706
return this.mcpTransportProvider.notifyClients(McpSchema.METHOD_NOTIFICATION_PROMPTS_LIST_CHANGED, null);
702707
}
703708

704-
private McpRequestHandler<McpSchema.ListPromptsResult> promptsListRequestHandler() {
705-
return (exchange, params) -> {
706-
// TODO: Implement pagination
707-
// McpSchema.PaginatedRequest request = objectMapper.convertValue(params,
708-
// new TypeReference<McpSchema.PaginatedRequest>() {
709-
// });
709+
protected Mono<ListPromptsResult> handleListPrompts(final McpAsyncServerExchange exchange, final Object params) {
710+
// TODO: Implement pagination
711+
// McpSchema.PaginatedRequest request = objectMapper.convertValue(params,
712+
// new TypeReference<McpSchema.PaginatedRequest>() {
713+
// });
710714

711-
var promptList = this.prompts.values()
712-
.stream()
713-
.map(McpServerFeatures.AsyncPromptSpecification::prompt)
714-
.toList();
715+
var promptList = this.prompts.values()
716+
.stream()
717+
.map(McpServerFeatures.AsyncPromptSpecification::prompt)
718+
.toList();
715719

716-
return Mono.just(new McpSchema.ListPromptsResult(promptList, null));
717-
};
720+
return Mono.just(new McpSchema.ListPromptsResult(promptList, null));
718721
}
719722

720-
private McpRequestHandler<McpSchema.GetPromptResult> promptsGetRequestHandler() {
721-
return (exchange, params) -> {
722-
McpSchema.GetPromptRequest promptRequest = objectMapper.convertValue(params,
723-
new TypeReference<McpSchema.GetPromptRequest>() {
724-
});
723+
protected Mono<GetPromptResult> handleGetPrompt(final McpAsyncServerExchange exchange, final Object params) {
724+
McpSchema.GetPromptRequest promptRequest = objectMapper.convertValue(params,
725+
new TypeReference<McpSchema.GetPromptRequest>() {
726+
});
725727

726-
// Implement prompt retrieval logic here
727-
McpServerFeatures.AsyncPromptSpecification specification = this.prompts.get(promptRequest.name());
728-
if (specification == null) {
729-
return Mono.error(new McpError("Prompt not found: " + promptRequest.name()));
730-
}
728+
// Implement prompt retrieval logic here
729+
McpServerFeatures.AsyncPromptSpecification specification = this.prompts.get(promptRequest.name());
730+
if (specification == null) {
731+
return Mono.error(new McpError("Prompt not found: " + promptRequest.name()));
732+
}
731733

732-
return Mono.defer(() -> specification.promptHandler().apply(exchange, promptRequest));
733-
};
734+
return Mono.defer(() -> specification.promptHandler().apply(exchange, promptRequest));
734735
}
735736

736737
// ---------------------------------------
@@ -763,79 +764,76 @@ public Mono<Void> loggingNotification(LoggingMessageNotification loggingMessageN
763764
loggingMessageNotification);
764765
}
765766

766-
private McpRequestHandler<Object> setLoggerRequestHandler() {
767-
return (exchange, params) -> {
768-
return Mono.defer(() -> {
767+
protected Mono<Map<String, Object>> handleSetLogger(final McpAsyncServerExchange exchange, final Object params) {
768+
return Mono.defer(() -> {
769769

770-
SetLevelRequest newMinLoggingLevel = objectMapper.convertValue(params,
771-
new TypeReference<SetLevelRequest>() {
772-
});
770+
SetLevelRequest newMinLoggingLevel = objectMapper.convertValue(params,
771+
new TypeReference<SetLevelRequest>() {
772+
});
773773

774-
exchange.setMinLoggingLevel(newMinLoggingLevel.level());
774+
exchange.setMinLoggingLevel(newMinLoggingLevel.level());
775775

776-
// FIXME: this field is deprecated and should be removed together
777-
// with the broadcasting loggingNotification.
778-
this.minLoggingLevel = newMinLoggingLevel.level();
776+
// FIXME: this field is deprecated and should be removed together
777+
// with the broadcasting loggingNotification.
778+
this.minLoggingLevel = newMinLoggingLevel.level();
779779

780-
return Mono.just(Map.of());
781-
});
782-
};
780+
return Mono.just(Map.of());
781+
});
783782
}
784783

785-
private McpRequestHandler<McpSchema.CompleteResult> completionCompleteRequestHandler() {
786-
return (exchange, params) -> {
787-
McpSchema.CompleteRequest request = parseCompletionParams(params);
784+
protected Mono<CompleteResult> handleCompletionComplete(final McpAsyncServerExchange exchange,
785+
final Object params) {
786+
McpSchema.CompleteRequest request = parseCompletionParams(params);
788787

789-
if (request.ref() == null) {
790-
return Mono.error(new McpError("ref must not be null"));
791-
}
788+
if (request.ref() == null) {
789+
return Mono.error(new McpError("ref must not be null"));
790+
}
792791

793-
if (request.ref().type() == null) {
794-
return Mono.error(new McpError("type must not be null"));
795-
}
792+
if (request.ref().type() == null) {
793+
return Mono.error(new McpError("type must not be null"));
794+
}
796795

797-
String type = request.ref().type();
796+
String type = request.ref().type();
798797

799-
String argumentName = request.argument().name();
798+
String argumentName = request.argument().name();
800799

801-
// check if the referenced resource exists
802-
if (type.equals("ref/prompt") && request.ref() instanceof McpSchema.PromptReference promptReference) {
803-
McpServerFeatures.AsyncPromptSpecification promptSpec = this.prompts.get(promptReference.name());
804-
if (promptSpec == null) {
805-
return Mono.error(new McpError("Prompt not found: " + promptReference.name()));
806-
}
807-
if (!promptSpec.prompt()
808-
.arguments()
809-
.stream()
810-
.filter(arg -> arg.name().equals(argumentName))
811-
.findFirst()
812-
.isPresent()) {
813-
814-
return Mono.error(new McpError("Argument not found: " + argumentName));
815-
}
800+
// check if the referenced resource exists
801+
if (type.equals("ref/prompt") && request.ref() instanceof McpSchema.PromptReference promptReference) {
802+
McpServerFeatures.AsyncPromptSpecification promptSpec = this.prompts.get(promptReference.name());
803+
if (promptSpec == null) {
804+
return Mono.error(new McpError("Prompt not found: " + promptReference.name()));
816805
}
806+
if (!promptSpec.prompt()
807+
.arguments()
808+
.stream()
809+
.filter(arg -> arg.name().equals(argumentName))
810+
.findFirst()
811+
.isPresent()) {
817812

818-
if (type.equals("ref/resource") && request.ref() instanceof McpSchema.ResourceReference resourceReference) {
819-
McpServerFeatures.AsyncResourceSpecification resourceSpec = this.resources.get(resourceReference.uri());
820-
if (resourceSpec == null) {
821-
return Mono.error(new McpError("Resource not found: " + resourceReference.uri()));
822-
}
823-
if (!uriTemplateManagerFactory.create(resourceSpec.resource().uri())
824-
.getVariableNames()
825-
.contains(argumentName)) {
826-
return Mono.error(new McpError("Argument not found: " + argumentName));
827-
}
813+
return Mono.error(new McpError("Argument not found: " + argumentName));
814+
}
815+
}
828816

817+
if (type.equals("ref/resource") && request.ref() instanceof McpSchema.ResourceReference resourceReference) {
818+
McpServerFeatures.AsyncResourceSpecification resourceSpec = this.resources.get(resourceReference.uri());
819+
if (resourceSpec == null) {
820+
return Mono.error(new McpError("Resource not found: " + resourceReference.uri()));
821+
}
822+
if (!uriTemplateManagerFactory.create(resourceSpec.resource().uri())
823+
.getVariableNames()
824+
.contains(argumentName)) {
825+
return Mono.error(new McpError("Argument not found: " + argumentName));
829826
}
830827

831-
McpServerFeatures.AsyncCompletionSpecification specification = this.completions.get(request.ref());
828+
}
832829

833-
if (specification == null) {
834-
return Mono.error(new McpError("AsyncCompletionSpecification not found: " + request.ref()));
835-
}
830+
McpServerFeatures.AsyncCompletionSpecification specification = this.completions.get(request.ref());
836831

837-
return Mono.defer(() -> specification.completionHandler().apply(exchange, request));
838-
};
832+
if (specification == null) {
833+
return Mono.error(new McpError("AsyncCompletionSpecification not found: " + request.ref()));
834+
}
835+
836+
return Mono.defer(() -> specification.completionHandler().apply(exchange, request));
839837
}
840838

841839
/**

0 commit comments

Comments
 (0)