2424
2525import java .io .IOException ;
2626import java .net .UnknownHostException ;
27+ import java .security .KeyManagementException ;
28+ import java .security .NoSuchAlgorithmException ;
29+ import java .security .cert .CertificateException ;
30+ import java .security .cert .X509Certificate ;
2731import java .util .ArrayList ;
2832import java .util .HashMap ;
2933import java .util .HashSet ;
3438import java .util .concurrent .ArrayBlockingQueue ;
3539import java .util .concurrent .BlockingQueue ;
3640
41+ import javax .net .SocketFactory ;
42+ import javax .net .ssl .SSLContext ;
43+ import javax .net .ssl .SSLSocketFactory ;
44+ import javax .net .ssl .TrustManager ;
45+ import javax .net .ssl .X509TrustManager ;
46+
3747import org .bson .BasicBSONObject ;
3848import org .bson .types .BSONTimestamp ;
3949import org .bson .types .ObjectId ;
7787import com .mongodb .Mongo ;
7888import com .mongodb .MongoClient ;
7989import com .mongodb .MongoClientOptions ;
90+ import com .mongodb .MongoClientOptions .Builder ;
8091import com .mongodb .MongoException ;
8192import com .mongodb .MongoInterruptedException ;
8293import com .mongodb .QueryOperators ;
@@ -104,6 +115,8 @@ public class MongoDBRiver extends AbstractRiverComponent implements River {
104115 public final static String PORT_FIELD = "port" ;
105116 public final static String OPTIONS_FIELD = "options" ;
106117 public final static String SECONDARY_READ_PREFERENCE_FIELD = "secondary_read_preference" ;
118+ public final static String SSL_CONNECTION_FIELD = "ssl" ;
119+ public final static String SSL_VERIFY_CERT_FIELD = "sslverifycertificate" ;
107120 public final static String DROP_COLLECTION_FIELD = "drop_collection" ;
108121 public final static String EXCLUDE_FIELDS_FIELD = "exclude_fields" ;
109122 public final static String FILTER_FIELD = "filter" ;
@@ -164,6 +177,8 @@ public class MongoDBRiver extends AbstractRiverComponent implements River {
164177 protected final String mongoLocalPassword ;
165178 protected final String mongoOplogNamespace ;
166179 protected final boolean mongoSecondaryReadPreference ;
180+ protected final boolean mongoUseSSL ;
181+ protected final boolean mongoSSLVerifyCertificate ;
167182
168183 protected final String indexName ;
169184 protected final String typeName ;
@@ -182,6 +197,7 @@ public class MongoDBRiver extends AbstractRiverComponent implements River {
182197 // private final TransferQueue<Map<String, Object>> stream = new
183198 // LinkedTransferQueue<Map<String, Object>>();
184199 private final BlockingQueue <Map <String , Object >> stream ;
200+ private SocketFactory sslSocketFactory ;
185201
186202 private Mongo mongo ;
187203 private DB adminDb ;
@@ -250,7 +266,11 @@ public MongoDBRiver(final RiverName riverName,
250266 .get (SECONDARY_READ_PREFERENCE_FIELD ), false );
251267 dropCollection = XContentMapValues .nodeBooleanValue (
252268 mongoOptionsSettings .get (DROP_COLLECTION_FIELD ), false );
253-
269+ mongoUseSSL = XContentMapValues .nodeBooleanValue (
270+ mongoOptionsSettings .get (SSL_CONNECTION_FIELD ), false );
271+ mongoSSLVerifyCertificate = XContentMapValues .nodeBooleanValue (
272+ mongoOptionsSettings .get (SSL_VERIFY_CERT_FIELD ), true );
273+
254274 if (mongoOptionsSettings .containsKey (EXCLUDE_FIELDS_FIELD )) {
255275 excludeFields = new HashSet <String >();
256276 Object excludeFieldsSettings = mongoOptionsSettings
@@ -274,6 +294,8 @@ public MongoDBRiver(final RiverName riverName,
274294 mongoSecondaryReadPreference = false ;
275295 dropCollection = false ;
276296 excludeFields = null ;
297+ mongoUseSSL = false ;
298+ mongoSSLVerifyCertificate = false ;
277299 }
278300
279301 // Credentials
@@ -377,6 +399,8 @@ public MongoDBRiver(final RiverName riverName,
377399 script = null ;
378400 dropCollection = false ;
379401 excludeFields = null ;
402+ mongoUseSSL = false ;
403+ mongoSSLVerifyCertificate = false ;
380404 }
381405 mongoOplogNamespace = mongoDb + "." + mongoCollection ;
382406
@@ -555,10 +579,16 @@ && getAdminDb().isAuthenticated()) {
555579
556580 private Mongo getMongoClient () {
557581 if (mongo == null ) {
558- // TODO: MongoClientOptions should be configurable
559- MongoClientOptions mco = MongoClientOptions .builder ()
582+
583+ Builder builder = MongoClientOptions .builder ()
560584 .autoConnectRetry (true ).connectTimeout (15000 )
561- .socketTimeout (60000 ).build ();
585+ .socketKeepAlive (true ).socketTimeout (60000 );
586+ if (mongoUseSSL ){
587+ builder .socketFactory (getSSLSocketFactory ());
588+ }
589+
590+ // TODO: MongoClientOptions should be configurable
591+ MongoClientOptions mco = builder .build ();
562592 mongo = new MongoClient (mongoServers , mco );
563593 }
564594 return mongo ;
@@ -595,6 +625,42 @@ public void close() {
595625 indexerThread .interrupt ();
596626 }
597627 }
628+
629+ private SocketFactory getSSLSocketFactory () {
630+ if (sslSocketFactory != null )
631+ return sslSocketFactory ;
632+
633+ if (!mongoSSLVerifyCertificate ) {
634+ try {
635+ final TrustManager [] trustAllCerts = new TrustManager [] { new X509TrustManager () {
636+
637+ @ Override
638+ public X509Certificate [] getAcceptedIssuers () {
639+ return null ;
640+ }
641+
642+ @ Override
643+ public void checkServerTrusted (X509Certificate [] chain , String authType )
644+ throws CertificateException {
645+ }
646+
647+ @ Override
648+ public void checkClientTrusted (X509Certificate [] chain , String authType )
649+ throws CertificateException {
650+ }
651+ }};
652+ final SSLContext sslContext = SSLContext .getInstance ( "SSL" );
653+ sslContext .init ( null , trustAllCerts , new java .security .SecureRandom () );
654+ // Create an ssl socket factory with our all-trusting manager
655+ sslSocketFactory = sslContext .getSocketFactory ();
656+ return sslSocketFactory ;
657+ } catch (Exception ex ) {
658+ logger .error ("Unable to build ssl socket factory without certificate validation, using default instead." , ex );
659+ }
660+ }
661+ sslSocketFactory = SSLSocketFactory .getDefault ();
662+ return sslSocketFactory ;
663+ }
598664
599665 private class Indexer implements Runnable {
600666
@@ -963,7 +1029,16 @@ private boolean assignCollections() {
9631029
9641030 @ Override
9651031 public void run () {
966- mongo = new MongoClient (mongoServers );
1032+ Builder builder = MongoClientOptions .builder ()
1033+ .autoConnectRetry (true ).connectTimeout (15000 )
1034+ .socketKeepAlive (true ).socketTimeout (60000 );
1035+ if (mongoUseSSL ){
1036+ builder .socketFactory (getSSLSocketFactory ());
1037+ }
1038+
1039+ // TODO: MongoClientOptions should be configurable
1040+ MongoClientOptions mco = builder .build ();
1041+ mongo = new MongoClient (mongoServers , mco );
9671042
9681043 if (mongoSecondaryReadPreference ) {
9691044 mongo .setReadPreference (ReadPreference .secondaryPreferred ());
0 commit comments