Skip to content

Commit d7dcec6

Browse files
committed
JCBC-1209: Audit and add explicit subscribers on req creation
Motivation ---------- In an earlier changeset the explicit subscriber management on the request has been added, but some of the APIs have been overlooked - this can lead to inconsistent behavior in core-io. Modifications ------------- This changeset audits all calls to core.send(...) and where needed adds the explicit subscriber management on the request since core-io checks if a subscriber is attached and if not it never treats it as non-active. Especially n1ql requests in this regard have been treaded differently before this changeset compared to kv requests. Result ------ Complete audit of .send() APIs and every request now properly handles it subscriber and can be checked against timeouts in core-io. Change-Id: I71f752384af7ac4fa88534531401901f92a1fb24 Reviewed-on: http://review.couchbase.org/95227 Reviewed-by: Michael Nitschinger <[email protected]> Tested-by: Michael Nitschinger <[email protected]>
1 parent 4fa5c71 commit d7dcec6

File tree

10 files changed

+223
-99
lines changed

10 files changed

+223
-99
lines changed

src/main/java/com/couchbase/client/java/CouchbaseAsyncBucket.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -813,11 +813,12 @@ public Observable<? extends D> call(Throwable throwable) {
813813

814814
@Override
815815
public Observable<AsyncViewResult> query(final ViewQuery query) {
816-
Observable<ViewQueryResponse> source = Observable.defer(new Func0<Observable<ViewQueryResponse>>() {
816+
Observable<ViewQueryResponse> source = deferAndWatch(new Func1<Subscriber, Observable<ViewQueryResponse>>() {
817817
@Override
818-
public Observable<ViewQueryResponse> call() {
818+
public Observable<ViewQueryResponse> call(Subscriber subscriber) {
819819
final ViewQueryRequest request = new ViewQueryRequest(query.getDesign(), query.getView(),
820820
query.isDevelopment(), query.toQueryString(), query.getKeys(), bucket, username, password);
821+
request.subscriber(subscriber);
821822
return core.send(request);
822823
}
823824
});
@@ -841,11 +842,12 @@ public Observable<AsyncSearchQueryResult> query(final SearchQuery query) {
841842
query.serverSideTimeout(environment().searchTimeout(), TimeUnit.MILLISECONDS);
842843
}
843844

844-
Observable<SearchQueryResponse> source = Observable.defer(new Func0<Observable<SearchQueryResponse>>() {
845+
Observable<SearchQueryResponse> source = deferAndWatch(new Func1<Subscriber, Observable<SearchQueryResponse>>() {
845846
@Override
846-
public Observable<SearchQueryResponse> call() {
847+
public Observable<SearchQueryResponse> call(Subscriber subscriber) {
847848
final SearchQueryRequest request =
848-
new SearchQueryRequest(indexName, query.export().toString(), bucket, username, password);
849+
new SearchQueryRequest(indexName, query.export().toString(), bucket, username, password);
850+
request.subscriber(subscriber);
849851
return core.send(request);
850852
}
851853
});
@@ -872,11 +874,12 @@ public AsyncSearchQueryResult call(SearchQueryResponse response) {
872874

873875
@Override
874876
public Observable<AsyncSpatialViewResult> query(final SpatialViewQuery query) {
875-
Observable<ViewQueryResponse> source = Observable.defer(new Func0<Observable<ViewQueryResponse>>() {
877+
Observable<ViewQueryResponse> source = deferAndWatch(new Func1<Subscriber, Observable<ViewQueryResponse>>() {
876878
@Override
877-
public Observable<ViewQueryResponse> call() {
879+
public Observable<ViewQueryResponse> call(Subscriber subscriber) {
878880
final ViewQueryRequest request = new ViewQueryRequest(query.getDesign(), query.getView(),
879-
query.isDevelopment(), true, query.toString(), null, bucket, username, password);
881+
query.isDevelopment(), true, query.toString(), null, bucket, username, password);
882+
request.subscriber(subscriber);
880883
return core.send(request);
881884
}
882885
});

src/main/java/com/couchbase/client/java/analytics/AnalyticsQueryExecutor.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,15 @@
2323
import com.couchbase.client.java.error.TranscodingException;
2424
import com.couchbase.client.java.transcoder.TranscoderUtils;
2525
import rx.Observable;
26-
import rx.functions.Func0;
26+
import rx.Subscriber;
2727
import rx.functions.Func1;
2828
import rx.functions.Func6;
2929

3030
import java.util.Arrays;
3131
import java.util.List;
3232

3333
import static com.couchbase.client.java.CouchbaseAsyncBucket.JSON_OBJECT_TRANSCODER;
34+
import static com.couchbase.client.java.util.OnSubscribeDeferAndWatch.deferAndWatch;
3435

3536
public class AnalyticsQueryExecutor {
3637

@@ -47,10 +48,13 @@ public AnalyticsQueryExecutor(ClusterFacade core, String bucket, String username
4748
}
4849

4950
public Observable<AsyncAnalyticsQueryResult> execute(final AnalyticsQuery query) {
50-
return Observable.defer(new Func0<Observable<GenericAnalyticsResponse>>() {
51+
return deferAndWatch(new Func1<Subscriber, Observable<GenericAnalyticsResponse>>() {
5152
@Override
52-
public Observable<GenericAnalyticsResponse> call() {
53-
return core.send(GenericAnalyticsRequest.jsonQuery(query.query().toString(), bucket, username, password));
53+
public Observable<GenericAnalyticsResponse> call(final Subscriber subscriber) {
54+
GenericAnalyticsRequest request = GenericAnalyticsRequest
55+
.jsonQuery(query.query().toString(), bucket, username, password);
56+
request.subscriber(subscriber);
57+
return core.<GenericAnalyticsResponse>send(request);
5458
}
5559
}).flatMap(new Func1<GenericAnalyticsResponse, Observable<AsyncAnalyticsQueryResult>>() {
5660
@Override

src/main/java/com/couchbase/client/java/bucket/BucketFlusher.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.couchbase.client.deps.io.netty.util.CharsetUtil;
3232
import com.couchbase.client.java.error.FlushDisabledException;
3333
import rx.Observable;
34+
import rx.Subscriber;
3435
import rx.functions.Action1;
3536
import rx.functions.Func1;
3637
import rx.functions.Func2;
@@ -39,6 +40,7 @@
3940
import java.util.List;
4041
import java.util.concurrent.TimeUnit;
4142

43+
import static com.couchbase.client.java.util.OnSubscribeDeferAndWatch.deferAndWatch;
4244
import static com.couchbase.client.java.util.retry.RetryBuilder.any;
4345

4446
/**
@@ -118,8 +120,15 @@ private static Observable<List<String>> createMarkerDocuments(final ClusterFacad
118120
.from(FLUSH_MARKERS)
119121
.flatMap(new Func1<String, Observable<UpsertResponse>>() {
120122
@Override
121-
public Observable<UpsertResponse> call(String id) {
122-
return core.send(new UpsertRequest(id, Unpooled.copiedBuffer(id, CharsetUtil.UTF_8), bucket));
123+
public Observable<UpsertResponse> call(final String id) {
124+
return deferAndWatch(new Func1<Subscriber, Observable<? extends UpsertResponse>>() {
125+
@Override
126+
public Observable<? extends UpsertResponse> call(final Subscriber subscriber) {
127+
UpsertRequest request = new UpsertRequest(id, Unpooled.copiedBuffer(id, CharsetUtil.UTF_8), bucket);
128+
request.subscriber(subscriber);
129+
return core.send(request);
130+
}
131+
});
123132
}
124133
})
125134
.doOnNext(new Action1<UpsertResponse>() {
@@ -152,8 +161,14 @@ public List<String> call(UpsertResponse response) {
152161
* @return an observable indicating if done (true) or polling needs to happen (false).
153162
*/
154163
private static Observable<Boolean> initiateFlush(final ClusterFacade core, final String bucket, final String username, final String password) {
155-
return core
156-
.<FlushResponse>send(new FlushRequest(bucket, username, password))
164+
return deferAndWatch(new Func1<Subscriber, Observable<FlushResponse>>() {
165+
@Override
166+
public Observable<FlushResponse> call(Subscriber subscriber) {
167+
FlushRequest request = new FlushRequest(bucket, username, password);
168+
request.subscriber(subscriber);
169+
return core.send(request);
170+
}
171+
})
157172
.retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build())
158173
.map(new Func1<FlushResponse, Boolean>() {
159174
@Override
@@ -182,8 +197,15 @@ private static Observable<Boolean> pollMarkerDocuments(final ClusterFacade core,
182197
.from(FLUSH_MARKERS)
183198
.flatMap(new Func1<String, Observable<GetResponse>>() {
184199
@Override
185-
public Observable<GetResponse> call(String id) {
186-
return core.send(new GetRequest(id, bucket));
200+
public Observable<GetResponse> call(final String id) {
201+
return deferAndWatch(new Func1<Subscriber, Observable<? extends GetResponse>>() {
202+
@Override
203+
public Observable<? extends GetResponse> call(Subscriber subscriber) {
204+
GetRequest request = new GetRequest(id, bucket);
205+
request.subscriber(subscriber);
206+
return core.send(request);
207+
}
208+
});
187209
}
188210
})
189211
.reduce(0, new Func2<Integer, GetResponse, Integer>() {

src/main/java/com/couchbase/client/java/bucket/DefaultAsyncBucketManager.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,15 @@ public Observable<BucketInfo> info() {
117117
return Observable.defer(new Func0<Observable<BucketConfigResponse>>() {
118118
@Override
119119
public Observable<BucketConfigResponse> call() {
120-
return core.send(new BucketConfigRequest("/pools/default/buckets/", null, bucket, username, password));
120+
return deferAndWatch(new Func1<Subscriber, Observable<? extends BucketConfigResponse>>() {
121+
@Override
122+
public Observable<? extends BucketConfigResponse> call(Subscriber subscriber) {
123+
BucketConfigRequest request = new BucketConfigRequest("/pools/default/buckets/",
124+
null, bucket, username, password);
125+
request.subscriber(subscriber);
126+
return core.send(request);
127+
}
128+
});
121129
}
122130
})
123131
.retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build())
@@ -151,7 +159,14 @@ public Observable<DesignDocument> getDesignDocuments(final boolean development)
151159
return Observable.defer(new Func0<Observable<GetDesignDocumentsResponse>>() {
152160
@Override
153161
public Observable<GetDesignDocumentsResponse> call() {
154-
return core.send(new GetDesignDocumentsRequest(bucket, username, password));
162+
return deferAndWatch(new Func1<Subscriber, Observable<? extends GetDesignDocumentsResponse>>() {
163+
@Override
164+
public Observable<? extends GetDesignDocumentsResponse> call(final Subscriber subscriber) {
165+
GetDesignDocumentsRequest request = new GetDesignDocumentsRequest(bucket, username, password);
166+
request.subscriber(subscriber);
167+
return core.send(request);
168+
}
169+
});
155170
}
156171
})
157172
.retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build())

src/main/java/com/couchbase/client/java/bucket/ReplicaReader.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,13 @@
3333
import com.couchbase.client.java.error.CouchbaseOutOfMemoryException;
3434
import com.couchbase.client.java.error.TemporaryFailureException;
3535
import rx.Observable;
36+
import rx.Subscriber;
3637
import rx.functions.Func0;
3738
import rx.functions.Func1;
3839

3940
import java.util.ArrayList;
4041
import java.util.List;
42+
import static com.couchbase.client.java.util.OnSubscribeDeferAndWatch.deferAndWatch;
4143

4244
/**
4345
* Helper class to deal with reading from zero to N replicas and returning results.
@@ -70,15 +72,19 @@ private ReplicaReader() {}
7072
public static Observable<GetResponse> read(final ClusterFacade core, final String id,
7173
final ReplicaMode type, final String bucket) {
7274
return assembleRequests(core, id, type, bucket)
73-
.flatMap(new Func1<BinaryRequest, Observable<GetResponse>>() {
74-
@Override
75-
public Observable<GetResponse> call(BinaryRequest request) {
76-
return core
77-
.<GetResponse>send(request)
78-
.filter(GetResponseFilter.INSTANCE)
79-
.onErrorResumeNext(GetResponseErrorHandler.INSTANCE);
80-
}
81-
});
75+
.flatMap(new Func1<BinaryRequest, Observable<GetResponse>>() {
76+
@Override
77+
public Observable<GetResponse> call(final BinaryRequest request) {
78+
Observable<GetResponse> result = deferAndWatch(new Func1<Subscriber, Observable<GetResponse>>() {
79+
@Override
80+
public Observable<GetResponse> call(Subscriber subscriber) {
81+
request.subscriber(subscriber);
82+
return core.send(request);
83+
}
84+
}).filter(GetResponseFilter.INSTANCE);
85+
return result.onErrorResumeNext(GetResponseErrorHandler.INSTANCE);
86+
}
87+
});
8288
}
8389

8490
/**

0 commit comments

Comments
 (0)