11package com .stitchdata .client ;
22
3- import java .io .Closeable ;
4- import java .io .EOFException ;
5- import java .io .Flushable ;
6- import java .io .IOException ;
7- import java .io .ByteArrayOutputStream ;
8- import java .io .ByteArrayInputStream ;
9- import java .io .UnsupportedEncodingException ;
10-
11- import java .util .List ;
3+ import com .cognitect .transit .Reader ;
4+ import com .cognitect .transit .TransitFactory ;
5+ import com .cognitect .transit .WriteHandler ;
6+ import com .cognitect .transit .Writer ;
7+ import jakarta .json .Json ;
8+ import jakarta .json .JsonObject ;
9+ import jakarta .json .JsonReader ;
10+
11+ import java .io .*;
12+ import java .net .URI ;
13+ import java .net .http .HttpClient ;
14+ import java .net .http .HttpRequest ;
15+ import java .net .http .HttpResponse ;
16+ import java .nio .charset .StandardCharsets ;
17+ import java .time .Duration ;
1218import java .util .ArrayList ;
13- import java .util .Map ;
1419import java .util .HashMap ;
15- import org .apache .http .client .fluent .Request ;
16- import org .apache .http .client .fluent .Response ;
17- import org .apache .http .client .ClientProtocolException ;
18- import org .apache .http .entity .ContentType ;
19- import org .apache .http .StatusLine ;
20- import org .apache .http .HttpResponse ;
21- import org .apache .http .HttpEntity ;
22- import javax .json .Json ;
23- import javax .json .JsonObject ;
24- import javax .json .JsonReader ;
25- import com .cognitect .transit .Writer ;
26- import com .cognitect .transit .WriteHandler ;
27- import com .cognitect .transit .TransitFactory ;
28- import com .cognitect .transit .Reader ;
20+ import java .util .List ;
21+ import java .util .Map ;
2922
3023/**
3124 * Client for Stitch.
3225 *
3326 * <p>Callers should use {@link StitchClientBuilder} to construct
3427 * instances of {@link StitchClient}.</p>
35- *
28+ * <p>
3629 * A StitchClient takes messages (instances of {@link StitchMessage})
3730 * and submits them to Stitch in batches. A call to {@link
3831 * StitchClient#push(StitchMessage)} adds a message to the current
4639 * batchSizeBytes to 0 will effectively disable batching and cause
4740 * each call to {@link #push(StitchMessage)} to send the message
4841 * immediatley.
49- *
42+ * <p>
5043 * You should open the client in a try-with-resources statement to
5144 * ensure that it is closed, otherwise you will lose any messages that
5245 * have been added to the buffer but not yet delivered.
7265 * }
7366 * }
7467 * </pre>
75- *
68+ * <p>
7669 * Instances of StitchClient are thread-safe. If buffering is enabled
7770 * (which it is by default), then multiple threads will accumulate
7871 * records into the same batch. When one of those threads makes a call
@@ -88,14 +81,14 @@ public class StitchClient implements Flushable, Closeable {
8881
8982 // HTTP constants
9083 public static final String PUSH_URL
91- = "https://api.stitchdata.com/v2/import/push" ;
84+ = "https://api.stitchdata.com/v2/import/push" ;
9285 private static final int HTTP_CONNECT_TIMEOUT = 1000 * 60 * 2 ;
93- private static final ContentType CONTENT_TYPE =
94- ContentType .create ("application/transit+json" );
86+ private static final String CONTENT_TYPE = "application/transit+json" ;
9587
9688 // HTTP properties
9789 private final int connectTimeout = HTTP_CONNECT_TIMEOUT ;
9890 private final String stitchUrl ;
91+ private final HttpClient httpClient ;
9992
10093 // Client-specific message values
10194 private final int clientId ;
@@ -111,7 +104,7 @@ public class StitchClient implements Flushable, Closeable {
111104
112105 private final Buffer buffer ;
113106 private final FlushHandler flushHandler ;
114- private final Map <Class ,WriteHandler <?,?>> writeHandlers ;
107+ private final Map <Class , WriteHandler <?, ?>> writeHandlers ;
115108
116109 private static void putWithDefault (Map map , String key , Object value , Object defaultValue ) {
117110 map .put (key , value != null ? value : defaultValue );
@@ -128,15 +121,16 @@ private byte[] messageToBytes(StitchMessage message) {
128121 HashMap map = new HashMap ();
129122
130123 switch (message .getAction ()) {
131- case UPSERT :
132- map .put ("action" , "upsert" );
133- putWithDefault (map , "key_names" , message .getKeyNames (), keyNames );
134- putIfNotNull (map , "data" , message .getData ());
135- break ;
136- case SWITCH_VIEW :
137- map .put ("action" , "switch_view" );
138- break ;
139- default : throw new IllegalArgumentException ("Action must not be null" );
124+ case UPSERT :
125+ map .put ("action" , "upsert" );
126+ putWithDefault (map , "key_names" , message .getKeyNames (), keyNames );
127+ putIfNotNull (map , "data" , message .getData ());
128+ break ;
129+ case SWITCH_VIEW :
130+ map .put ("action" , "switch_view" );
131+ break ;
132+ default :
133+ throw new IllegalArgumentException ("Action must not be null" );
140134 }
141135
142136 map .put ("client_id" , clientId );
@@ -153,17 +147,16 @@ private byte[] messageToBytes(StitchMessage message) {
153147 }
154148
155149 StitchClient (
156- String stitchUrl ,
157- int clientId ,
158- String token ,
159- String namespace ,
160- String tableName ,
161- List <String > keyNames ,
162- int batchSizeBytes ,
163- int batchDelayMillis ,
164- FlushHandler flushHandler ,
165- Map <Class ,WriteHandler <?,?>> writeHandlers )
166- {
150+ String stitchUrl ,
151+ int clientId ,
152+ String token ,
153+ String namespace ,
154+ String tableName ,
155+ List <String > keyNames ,
156+ int batchSizeBytes ,
157+ int batchDelayMillis ,
158+ FlushHandler flushHandler ,
159+ Map <Class , WriteHandler <?, ?>> writeHandlers ) {
167160 this .stitchUrl = stitchUrl ;
168161 this .clientId = clientId ;
169162 this .token = token ;
@@ -175,6 +168,9 @@ private byte[] messageToBytes(StitchMessage message) {
175168 this .buffer = new Buffer ();
176169 this .flushHandler = flushHandler ;
177170 this .writeHandlers = TransitFactory .writeHandlerMap (writeHandlers );
171+ this .httpClient = HttpClient .newBuilder ()
172+ .connectTimeout (Duration .ofMillis (connectTimeout ))
173+ .build ();
178174 }
179175
180176 /**
@@ -192,8 +188,8 @@ private byte[] messageToBytes(StitchMessage message) {
192188 * @param message the message
193189 * @throws StitchException if Stitch rejected or was unable to
194190 * process the message
195- * @throws IOException if there was an error communicating with
196- * Stitch
191+ * @throws IOException if there was an error communicating with
192+ * Stitch
197193 */
198194 public void push (StitchMessage message ) throws StitchException , IOException {
199195 push (message , message );
@@ -217,13 +213,13 @@ public void push(StitchMessage message) throws StitchException, IOException {
217213 * sent immediately and this function will block until it is
218214 * delivered.</p>
219215 *
220- * @param message the message
221- * @param callbackArg flush handler will be invoked with this as
216+ * @param message the message
217+ * @param callbackArg flush handler will be invoked with this as
222218 * one of the callbackArgs.
223219 * @throws StitchException if Stitch rejected or was unable to
224220 * process the message
225- * @throws IOException if there was an error communicating with
226- * Stitch
221+ * @throws IOException if there was an error communicating with
222+ * Stitch
227223 */
228224 public void push (StitchMessage message , Object callbackArg ) throws StitchException , IOException {
229225 buffer .put (new Buffer .Entry (messageToBytes (message ), callbackArg ));
@@ -234,28 +230,45 @@ public void push(StitchMessage message, Object callbackArg) throws StitchExcepti
234230 }
235231
236232 StitchResponse sendToStitch (String body ) throws IOException {
237- Request request = Request .Post (stitchUrl )
238- .connectTimeout (connectTimeout )
239- .addHeader ("Authorization" , "Bearer " + token )
240- .bodyString (body , CONTENT_TYPE );
241-
242- HttpResponse response = request .execute ().returnResponse ();
243- int statusCode = response .getStatusLine ().getStatusCode ();
244- String reasonPhrase = response .getStatusLine ().getReasonPhrase ();
245- ContentType contentType = ContentType .get (response .getEntity ());
233+ HttpRequest request = HttpRequest .newBuilder (URI .create (stitchUrl ))
234+ .header ("Authorization" , "Bearer " + token )
235+ .header ("Content-Type" , CONTENT_TYPE )
236+ .POST (HttpRequest .BodyPublishers .ofString (body , StandardCharsets .UTF_8 ))
237+ .build ();
238+
239+ HttpResponse <String > response ;
240+ try {
241+ response = httpClient .send (request , HttpResponse .BodyHandlers .ofString (StandardCharsets .UTF_8 ));
242+ } catch (InterruptedException e ) {
243+ Thread .currentThread ().interrupt ();
244+ throw new IOException ("Interrupted while sending request to Stitch" , e );
245+ }
246+
247+ int statusCode = response .statusCode ();
248+ String reasonPhrase = "" ;
249+ String contentType = response .headers ().firstValue ("Content-Type" ).orElse (null );
246250 JsonObject content = null ;
247251
248252 // Don't attempt to parse body for 5xx responses or if the
249253 // Content-Type doesn't explicitly state application/json.
250- if (statusCode < 500 &&
251- contentType != null &&
252- ContentType .APPLICATION_JSON .getMimeType ().equals (contentType .getMimeType ())) {
253- JsonReader rdr = Json .createReader (response .getEntity ().getContent ());
254+ if (statusCode < 500 && isJsonContentType (contentType )) {
255+ JsonReader rdr = Json .createReader (new StringReader (response .body ()));
254256 content = rdr .readObject ();
255257 }
256258 return new StitchResponse (statusCode , reasonPhrase , content );
257259 }
258260
261+ private static boolean isJsonContentType (String contentTypeHeader ) {
262+ if (contentTypeHeader == null ) {
263+ return false ;
264+ }
265+ int separator = contentTypeHeader .indexOf (';' );
266+ String mimeType = separator >= 0
267+ ? contentTypeHeader .substring (0 , separator ).trim ()
268+ : contentTypeHeader .trim ();
269+ return "application/json" .equalsIgnoreCase (mimeType );
270+ }
271+
259272 void sendBatch (List <Buffer .Entry > batch ) throws IOException {
260273
261274 String body = serializeEntries (batch );
@@ -285,7 +298,7 @@ static String serializeEntries(List<Buffer.Entry> entries) throws UnsupportedEnc
285298 for (Buffer .Entry entry : entries ) {
286299 ByteArrayInputStream bais = new ByteArrayInputStream (entry .bytes );
287300 Reader reader = TransitFactory .reader (TransitFactory .Format .JSON , bais );
288- messages .add ((Map )reader .read ());
301+ messages .add ((Map ) reader .read ());
289302 }
290303
291304 ByteArrayOutputStream baos = new ByteArrayOutputStream ();
@@ -299,8 +312,8 @@ static String serializeEntries(List<Buffer.Entry> entries) throws UnsupportedEnc
299312 *
300313 * @throws StitchException if Stitch rejected or was unable to
301314 * process the message
302- * @throws IOException if there was an error communicating with
303- * Stitch
315+ * @throws IOException if there was an error communicating with
316+ * Stitch
304317 */
305318 public void flush () throws IOException {
306319 while (true ) {
@@ -317,8 +330,8 @@ public void flush() throws IOException {
317330 *
318331 * @throws StitchException if Stitch rejected or was unable to
319332 * process the message
320- * @throws IOException if there was an error communicating with
321- * Stitch
333+ * @throws IOException if there was an error communicating with
334+ * Stitch
322335 */
323336 public void close () throws IOException {
324337 flush ();
0 commit comments