3030import io .opensergo .util .AssertUtils ;
3131import io .opensergo .util .IdentifierUtils ;
3232
33+ import java .util .Optional ;
34+ import java .util .concurrent .TimeUnit ;
3335import java .util .concurrent .atomic .AtomicInteger ;
3436
3537/**
@@ -46,6 +48,7 @@ public class OpenSergoClient implements AutoCloseable {
4648 private final SubscribeRegistry subscribeRegistry ;
4749
4850 private AtomicInteger reqId ;
51+ protected volatile OpensergoClientStatus status ;
4952
5053 public OpenSergoClient (String host , int port ) {
5154 this .channel = ManagedChannelBuilder .forAddress (host , port )
@@ -56,17 +59,68 @@ public OpenSergoClient(String host, int port) {
5659 this .configCache = new SubscribedConfigCache ();
5760 this .subscribeRegistry = new SubscribeRegistry ();
5861 this .reqId = new AtomicInteger (0 );
62+ status = OpensergoClientStatus .INITIAL ;
63+ }
64+
65+ public void registerSubscribeInfo (OpensergoClientSubscribeInfo subscribeInfo ) {
66+ // Register subscriber to local.
67+ if (Optional .of (subscribeInfo .getSubscriberList ()).isPresent () && subscribeInfo .getSubscriberList ().size () > 0 ) {
68+ subscribeInfo .getSubscriberList ().forEach (subscriber -> {
69+ this .subscribeRegistry .registerSubscriber (subscribeInfo .getSubscribeKey (), subscriber );
70+ OpenSergoLogger .info ("OpenSergo subscribeinfo registered, subscribeKey={}, subscriber={}" , subscribeInfo .getSubscribeKey (), subscriber );
71+
72+ if (requestAndResponseWriter != null && this .status == OpensergoClientStatus .STARTED ) {
73+ this .subscribeConfig (subscribeInfo .getSubscribeKey ());
74+ }
75+ });
76+ }
5977 }
6078
6179 public void start () throws Exception {
80+ OpenSergoLogger .info ("OpensergoClient is starting..." );
81+
82+ if (status == OpensergoClientStatus .INITIAL ) {
83+ OpenSergoLogger .info ("open keepavlive thread" );
84+ new Thread (this ::keepAlive ).start ();
85+ }
86+
87+ status = OpensergoClientStatus .STARTING ;
88+
6289 this .requestAndResponseWriter = transportGrpcStub .withWaitForReady ()
63- .subscribeConfig (new OpenSergoSubscribeClientObserver (configCache , subscribeRegistry ));
90+ .subscribeConfig (new OpenSergoSubscribeClientObserver (this ));
91+
92+ OpenSergoLogger .info ("begin to subscribe config-data..." );
93+ this .subscribeRegistry .getSubscriberKeysAll ().forEach (subscribeKey -> {
94+ this .subscribeConfig (subscribeKey );
95+ });
96+
97+ OpenSergoLogger .info ("openSergoClient is started" );
98+ status = OpensergoClientStatus .STARTED ;
99+ }
100+
101+ private void keepAlive () {
102+ try {
103+ if (status != OpensergoClientStatus .STARTING
104+ && status != OpensergoClientStatus .STARTED
105+ && status != OpensergoClientStatus .SHUTDOWN ) {
106+ OpenSergoLogger .info ("try to restart openSergoClient..." );
107+ this .start ();
108+ }
109+ Thread .sleep (TimeUnit .SECONDS .toMillis (10 ));
110+ if ( status != OpensergoClientStatus .SHUTDOWN ) {
111+ keepAlive ();
112+ }
113+ } catch (Exception e ) {
114+ e .printStackTrace ();
115+ }
64116 }
65117
66118 @ Override
67119 public void close () throws Exception {
68120 requestAndResponseWriter .onCompleted ();
69121
122+ status = OpensergoClientStatus .SHUTDOWN ;
123+
70124 // gracefully drain the requests, then close the connection
71125 channel .shutdown ();
72126 }
@@ -77,8 +131,8 @@ public boolean unsubscribeConfig(SubscribeKey subscribeKey) {
77131 AssertUtils .assertNotNull (subscribeKey .getKind (), "kind cannot be null" );
78132
79133 if (requestAndResponseWriter == null ) {
80- // TODO: return status that indicates not ready
81- throw new IllegalStateException ( "gRPC stream is not ready" ) ;
134+ OpenSergoLogger . error ( "Fatal error occurred on OpenSergo gRPC ClientObserver" , new IllegalStateException ( "gRPC stream is not ready" ));
135+ status = OpensergoClientStatus . INTERRUPTED ;
82136 }
83137 SubscribeRequestTarget subTarget = SubscribeRequestTarget .newBuilder ()
84138 .setNamespace (subscribeKey .getNamespace ()).setApp (subscribeKey .getApp ())
@@ -106,8 +160,8 @@ public boolean subscribeConfig(SubscribeKey subscribeKey, OpenSergoConfigSubscri
106160 AssertUtils .assertNotNull (subscribeKey .getKind (), "kind cannot be null" );
107161
108162 if (requestAndResponseWriter == null ) {
109- // TODO: return status that indicates not ready
110- throw new IllegalStateException ( "gRPC stream is not ready" ) ;
163+ OpenSergoLogger . error ( "Fatal error occurred on OpenSergo gRPC ClientObserver" , new IllegalStateException ( "gRPC stream is not ready" ));
164+ status = OpensergoClientStatus . INTERRUPTED ;
111165 }
112166 SubscribeRequestTarget subTarget = SubscribeRequestTarget .newBuilder ()
113167 .setNamespace (subscribeKey .getNamespace ()).setApp (subscribeKey .getApp ())
@@ -121,18 +175,15 @@ public boolean subscribeConfig(SubscribeKey subscribeKey, OpenSergoConfigSubscri
121175 // Send SubscribeRequest
122176 requestAndResponseWriter .onNext (request );
123177
124- // Register subscriber to local.
125- if (subscriber != null ) {
126- subscribeRegistry .registerSubscriber (subscribeKey , subscriber );
127- OpenSergoLogger .info ("OpenSergo config subscriber registered, subscribeKey={}, subscriber={}" ,
128- subscribeKey , subscriber );
129- }
130-
131178 return true ;
132179 }
133180
134181 public SubscribedConfigCache getConfigCache () {
135182 return configCache ;
136183 }
137184
185+ public SubscribeRegistry getSubscribeRegistry () {
186+ return subscribeRegistry ;
187+ }
188+
138189}
0 commit comments