synchronizedvoidenqueue(AsyncCallcall){if(runningAsyncCalls.size()<maxRequests&&runningCallsForHost(call)<maxRequestsPerHost){runningAsyncCalls.add(call);executorService().execute(call);}else{readyAsyncCalls.add(call);}}private<T>voidfinished(Deque<T>calls,Tcall,booleanpromoteCalls){intrunningCallsCount;RunnableidleCallback;synchronized(this){if(!calls.remove(call))thrownewAssertionError("Call wasn't in-flight!");if(promoteCalls)promoteCalls();runningCallsCount=runningCallsCount();idleCallback=this.idleCallback;}if(runningCallsCount==0&&idleCallback!=null){idleCallback.run();}}privatevoidpromoteCalls(){if(runningAsyncCalls.size()>=maxRequests)return;// Already running max capacity.if(readyAsyncCalls.isEmpty())return;// No ready calls to promote.for(Iterator<AsyncCall>i=readyAsyncCalls.iterator();i.hasNext();){AsyncCallcall=i.next();if(runningCallsForHost(call)<maxRequestsPerHost){i.remove();runningAsyncCalls.add(call);executorService().execute(call);}if(runningAsyncCalls.size()>=maxRequests)return;// Reached max capacity.}}
ResponsegetResponseWithInterceptorChain()throwsIOException{// Build a full stack of interceptors.List<Interceptor>interceptors=newArrayList<>();interceptors.addAll(client.interceptors());interceptors.add(retryAndFollowUpInterceptor);interceptors.add(newBridgeInterceptor(client.cookieJar()));interceptors.add(newCacheInterceptor(client.internalCache()));interceptors.add(newConnectInterceptor(client));if(!forWebSocket){interceptors.addAll(client.networkInterceptors());}interceptors.add(newCallServerInterceptor(forWebSocket));Interceptor.Chainchain=newRealInterceptorChain(interceptors,null,null,null,0,originalRequest,this,eventListener,client.connectTimeoutMillis(),client.readTimeoutMillis(),client.writeTimeoutMillis());returnchain.proceed(originalRequest);}
// Call the next interceptor in the chain.RealInterceptorChainnext=newRealInterceptorChain(interceptors,streamAllocation,httpCodec,connection,index+1,request,call,eventListener,connectTimeout,readTimeout,writeTimeout);Interceptorinterceptor=interceptors.get(index);Responseresponse=interceptor.intercept(next);
StreamAllocationstreamAllocation=newStreamAllocation(client.connectionPool(),createAddress(request.url()),call,eventListener,callStackTrace);this.streamAllocation=streamAllocation;intfollowUpCount=0;ResponsepriorResponse=null;while(true){if(canceled){streamAllocation.release();thrownewIOException("Canceled");}Responseresponse;booleanreleaseConnection=true;try{response=realChain.proceed(request,streamAllocation,null,null);releaseConnection=false;}catch(RouteExceptione){// The attempt to connect via a route failed. The request will not have been sent.if(!recover(e.getLastConnectException(),streamAllocation,false,request)){throwe.getLastConnectException();}releaseConnection=false;continue;}catch(IOExceptione){// An attempt to communicate with a server failed. The request may have been sent.booleanrequestSendStarted=!(einstanceofConnectionShutdownException);if(!recover(e,streamAllocation,requestSendStarted,request))throwe;releaseConnection=false;continue;}finally{// We're throwing an unchecked exception. Release any resources.if(releaseConnection){streamAllocation.streamFailed(null);streamAllocation.release();}}// Attach the prior response if it exists. Such responses never have a body.if(priorResponse!=null){response=response.newBuilder().priorResponse(priorResponse.newBuilder().body(null).build()).build();}RequestfollowUp=followUpRequest(response,streamAllocation.route());if(followUp==null){if(!forWebSocket){streamAllocation.release();}returnresponse;}closeQuietly(response.body());if(++followUpCount>MAX_FOLLOW_UPS){streamAllocation.release();thrownewProtocolException("Too many follow-up requests: "+followUpCount);}if(followUp.body()instanceofUnrepeatableRequestBody){streamAllocation.release();thrownewHttpRetryException("Cannot retry streamed HTTP body",response.code());}if(!sameConnection(response,followUp.url())){streamAllocation.release();streamAllocation=newStreamAllocation(client.connectionPool(),createAddress(followUp.url()),call,eventListener,callStackTrace);this.streamAllocation=streamAllocation;}elseif(streamAllocation.codec()!=null){thrownewIllegalStateException("Closing the body of "+response+" didn't close its backing stream. Bad interceptor?");}request=followUp;priorResponse=response;}
@OverridepublicResponseintercept(Chainchain)throwsIOException{RequestuserRequest=chain.request();Request.BuilderrequestBuilder=userRequest.newBuilder();RequestBodybody=userRequest.body();if(body!=null){MediaTypecontentType=body.contentType();if(contentType!=null){requestBuilder.header("Content-Type",contentType.toString());}longcontentLength=body.contentLength();if(contentLength!=-1){requestBuilder.header("Content-Length",Long.toString(contentLength));requestBuilder.removeHeader("Transfer-Encoding");}else{requestBuilder.header("Transfer-Encoding","chunked");requestBuilder.removeHeader("Content-Length");}}if(userRequest.header("Host")==null){requestBuilder.header("Host",hostHeader(userRequest.url(),false));}if(userRequest.header("Connection")==null){requestBuilder.header("Connection","Keep-Alive");}// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing// the transfer stream.booleantransparentGzip=false;if(userRequest.header("Accept-Encoding")==null&&userRequest.header("Range")==null){transparentGzip=true;requestBuilder.header("Accept-Encoding","gzip");}List<Cookie>cookies=cookieJar.loadForRequest(userRequest.url());if(!cookies.isEmpty()){requestBuilder.header("Cookie",cookieHeader(cookies));}if(userRequest.header("User-Agent")==null){requestBuilder.header("User-Agent",Version.userAgent());}ResponsenetworkResponse=chain.proceed(requestBuilder.build());HttpHeaders.receiveHeaders(cookieJar,userRequest.url(),networkResponse.headers());Response.BuilderresponseBuilder=networkResponse.newBuilder().request(userRequest);if(transparentGzip&&"gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))&&HttpHeaders.hasBody(networkResponse)){GzipSourceresponseBody=newGzipSource(networkResponse.body().source());HeadersstrippedHeaders=networkResponse.headers().newBuilder().removeAll("Content-Encoding").removeAll("Content-Length").build();responseBuilder.headers(strippedHeaders);StringcontentType=networkResponse.header("Content-Type");responseBuilder.body(newRealResponseBody(contentType,-1L,Okio.buffer(responseBody)));}returnresponseBuilder.build();}
@NullableResponseget(Requestrequest){Stringkey=key(request.url());DiskLruCache.Snapshotsnapshot;Entryentry;try{snapshot=cache.get(key);if(snapshot==null){returnnull;}}catch(IOExceptione){// Give up because the cache cannot be read.returnnull;}try{entry=newEntry(snapshot.getSource(ENTRY_METADATA));}catch(IOExceptione){Util.closeQuietly(snapshot);returnnull;}Responseresponse=entry.response(snapshot);if(!entry.matches(request,response)){Util.closeQuietly(response.body());returnnull;}returnresponse;}
@NullableCacheRequestput(Responseresponse){StringrequestMethod=response.request().method();if(HttpMethod.invalidatesCache(response.request().method())){try{remove(response.request());}catch(IOExceptionignored){// The cache cannot be written.}returnnull;}if(!requestMethod.equals("GET")){// Don't cache non-GET responses. We're technically allowed to cache// HEAD requests and some POST requests, but the complexity of doing// so is high and the benefit is low.returnnull;}if(HttpHeaders.hasVaryAll(response)){returnnull;}// 注意这里Entryentry=newEntry(response);DiskLruCache.Editoreditor=null;try{editor=cache.edit(key(response.request().url()));if(editor==null){returnnull;}entry.writeTo(editor);//注意这里returnnewCacheRequestImpl(editor);}catch(IOExceptione){abortQuietly(editor);returnnull;}}
@OverridepublicResponseintercept(Chainchain)throwsIOException{ResponsecacheCandidate=cache!=null?cache.get(chain.request()):null;longnow=System.currentTimeMillis();CacheStrategystrategy=newCacheStrategy.Factory(now,chain.request(),cacheCandidate).get();RequestnetworkRequest=strategy.networkRequest;ResponsecacheResponse=strategy.cacheResponse;if(cache!=null){cache.trackResponse(strategy);}if(cacheCandidate!=null&&cacheResponse==null){closeQuietly(cacheCandidate.body());// The cache candidate wasn't applicable. Close it.}// If we're forbidden from using the network and the cache is insufficient, fail.if(networkRequest==null&&cacheResponse==null){returnnewResponse.Builder().request(chain.request()).protocol(Protocol.HTTP_1_1).code(504).message("Unsatisfiable Request (only-if-cached)").body(Util.EMPTY_RESPONSE).sentRequestAtMillis(-1L).receivedResponseAtMillis(System.currentTimeMillis()).build();}// If we don't need the network, we're done.if(networkRequest==null){returncacheResponse.newBuilder().cacheResponse(stripBody(cacheResponse)).build();}ResponsenetworkResponse=null;try{networkResponse=chain.proceed(networkRequest);}finally{// If we're crashing on I/O or otherwise, don't leak the cache body.if(networkResponse==null&&cacheCandidate!=null){closeQuietly(cacheCandidate.body());}}// If we have a cache response too, then we're doing a conditional get.if(cacheResponse!=null){if(networkResponse.code()==HTTP_NOT_MODIFIED){Responseresponse=cacheResponse.newBuilder().headers(combine(cacheResponse.headers(),networkResponse.headers())).sentRequestAtMillis(networkResponse.sentRequestAtMillis()).receivedResponseAtMillis(networkResponse.receivedResponseAtMillis()).cacheResponse(stripBody(cacheResponse)).networkResponse(stripBody(networkResponse)).build();networkResponse.body().close();// Update the cache after combining headers but before stripping the// Content-Encoding header (as performed by initContentStream()).cache.trackConditionalCacheHit();cache.update(cacheResponse,response);returnresponse;}else{closeQuietly(cacheResponse.body());}}Responseresponse=networkResponse.newBuilder().cacheResponse(stripBody(cacheResponse)).networkResponse(stripBody(networkResponse)).build();if(cache!=null){if(HttpHeaders.hasBody(response)&&CacheStrategy.isCacheable(response,networkRequest)){// Offer this request to the cache.// *****注意这里*****CacheRequestcacheRequest=cache.put(response);returncacheWritingResponse(cacheRequest,response);}if(HttpMethod.invalidatesCache(networkRequest.method())){try{cache.remove(networkRequest);}catch(IOExceptionignored){// The cache cannot be written.}}}returnresponse;}
可以看到主要是对cacheResponse和networkResponse进行的一些处理,比如返回304 not modified之类的处理结果。然后将Response写入缓存中。
/** * Returns a new source that writes bytes to {@code cacheRequest} as they are read by the source * consumer. This is careful to discard bytes left over when the stream is closed; otherwise we * may never exhaust the source stream and therefore not complete the cached response. */privateResponsecacheWritingResponse(finalCacheRequestcacheRequest,Responseresponse)throwsIOException{// Some apps return a null body; for compatibility we treat that like a null cache request.if(cacheRequest==null)returnresponse;SinkcacheBodyUnbuffered=cacheRequest.body();if(cacheBodyUnbuffered==null)returnresponse;finalBufferedSourcesource=response.body().source();finalBufferedSinkcacheBody=Okio.buffer(cacheBodyUnbuffered);SourcecacheWritingSource=newSource(){booleancacheRequestClosed;@Overridepubliclongread(Buffersink,longbyteCount)throwsIOException{longbytesRead;try{bytesRead=source.read(sink,byteCount);}catch(IOExceptione){if(!cacheRequestClosed){cacheRequestClosed=true;cacheRequest.abort();// Failed to write a complete cache response.}throwe;}if(bytesRead==-1){if(!cacheRequestClosed){cacheRequestClosed=true;cacheBody.close();// The cache response is complete!}return-1;}sink.copyTo(cacheBody.buffer(),sink.size()-bytesRead,bytesRead);cacheBody.emitCompleteSegments();returnbytesRead;}@OverridepublicTimeouttimeout(){returnsource.timeout();}@Overridepublicvoidclose()throwsIOException{if(!cacheRequestClosed&&!discard(this,HttpCodec.DISCARD_STREAM_TIMEOUT_MILLIS,MILLISECONDS)){cacheRequestClosed=true;cacheRequest.abort();}source.close();}};StringcontentType=response.header("Content-Type");longcontentLength=response.body().contentLength();returnresponse.newBuilder().body(newRealResponseBody(contentType,contentLength,Okio.buffer(cacheWritingSource))).build();}
// We need the network to satisfy this request. Possibly for validating a conditional GET.booleandoExtensiveHealthChecks=!request.method().equals("GET");HttpCodechttpCodec=streamAllocation.newStream(client,chain,doExtensiveHealthChecks);RealConnectionconnection=streamAllocation.connection();
publicHttpCodecnewStream(OkHttpClientclient,Interceptor.Chainchain,booleandoExtensiveHealthChecks){intconnectTimeout=chain.connectTimeoutMillis();intreadTimeout=chain.readTimeoutMillis();intwriteTimeout=chain.writeTimeoutMillis();intpingIntervalMillis=client.pingIntervalMillis();booleanconnectionRetryEnabled=client.retryOnConnectionFailure();try{RealConnectionresultConnection=findHealthyConnection(connectTimeout,readTimeout,writeTimeout,pingIntervalMillis,connectionRetryEnabled,doExtensiveHealthChecks);HttpCodecresultCodec=resultConnection.newCodec(client,chain,this);synchronized(connectionPool){codec=resultCodec;returnresultCodec;}}catch(IOExceptione){thrownewRouteException(e);}}/** * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated * until a healthy connection is found. */privateRealConnectionfindHealthyConnection(intconnectTimeout,intreadTimeout,intwriteTimeout,intpingIntervalMillis,booleanconnectionRetryEnabled,booleandoExtensiveHealthChecks)throwsIOException{while(true){RealConnectioncandidate=findConnection(connectTimeout,readTimeout,writeTimeout,pingIntervalMillis,connectionRetryEnabled);// If this is a brand new connection, we can skip the extensive health checks.synchronized(connectionPool){if(candidate.successCount==0){returncandidate;}}// Do a (potentially slow) check to confirm that the pooled connection is still good. If it// isn't, take it out of the pool and start again.if(!candidate.isHealthy(doExtensiveHealthChecks)){noNewStreams();continue;}returncandidate;}}/** * Returns a connection to host a new stream. This prefers the existing connection if it exists, * then the pool, finally building a new connection. */privateRealConnectionfindConnection(intconnectTimeout,intreadTimeout,intwriteTimeout,intpingIntervalMillis,booleanconnectionRetryEnabled)throwsIOException{booleanfoundPooledConnection=false;RealConnectionresult=null;RouteselectedRoute=null;ConnectionreleasedConnection;SockettoClose;synchronized(connectionPool){if(released)thrownewIllegalStateException("released");if(codec!=null)thrownewIllegalStateException("codec != null");if(canceled)thrownewIOException("Canceled");// Attempt to use an already-allocated connection. We need to be careful here because our// already-allocated connection may have been restricted from creating new streams.releasedConnection=this.connection;toClose=releaseIfNoNewStreams();if(this.connection!=null){// We had an already-allocated connection and it's good.result=this.connection;releasedConnection=null;}if(!reportedAcquired){// If the connection was never reported acquired, don't report it as released!releasedConnection=null;}if(result==null){// Attempt to get a connection from the pool.// 注意这里,从ConnectionPool中找ConnectionInternal.instance.get(connectionPool,address,this,null);if(connection!=null){foundPooledConnection=true;result=connection;}else{selectedRoute=route;}}}closeQuietly(toClose);if(releasedConnection!=null){eventListener.connectionReleased(call,releasedConnection);}if(foundPooledConnection){eventListener.connectionAcquired(call,result);}if(result!=null){// If we found an already-allocated or pooled connection, we're done.returnresult;}// If we need a route selection, make one. This is a blocking operation.booleannewRouteSelection=false;if(selectedRoute==null&&(routeSelection==null||!routeSelection.hasNext())){newRouteSelection=true;routeSelection=routeSelector.next();}synchronized(connectionPool){if(canceled)thrownewIOException("Canceled");if(newRouteSelection){// Now that we have a set of IP addresses, make another attempt at getting a connection from// the pool. This could match due to connection coalescing.List<Route>routes=routeSelection.getAll();for(inti=0,size=routes.size();i<size;i++){Routeroute=routes.get(i);Internal.instance.get(connectionPool,address,this,route);if(connection!=null){foundPooledConnection=true;result=connection;this.route=route;break;}}}if(!foundPooledConnection){if(selectedRoute==null){selectedRoute=routeSelection.next();}// Create a connection and assign it to this allocation immediately. This makes it possible// for an asynchronous cancel() to interrupt the handshake we're about to do.route=selectedRoute;refusedStreamCount=0;result=newRealConnection(connectionPool,selectedRoute);acquire(result,false);}}// If we found a pooled connection on the 2nd time around, we're done.if(foundPooledConnection){eventListener.connectionAcquired(call,result);returnresult;}// Do TCP + TLS handshakes. This is a blocking operation.// 进行tcp握手result.connect(connectTimeout,readTimeout,writeTimeout,pingIntervalMillis,connectionRetryEnabled,call,eventListener);routeDatabase().connected(result.route());Socketsocket=null;synchronized(connectionPool){reportedAcquired=true;// Pool the connection.// 把connection放到ConnectionPool中Internal.instance.put(connectionPool,result);// If another multiplexed connection to the same address was created concurrently, then// release this connection and acquire that one.if(result.isMultiplexed()){socket=Internal.instance.deduplicate(connectionPool,address,this);result=connection;}}closeQuietly(socket);eventListener.connectionAcquired(call,result);returnresult;}
/** * Returns a recycled connection to {@code address}, or null if no such connection exists. The * route is null if the address has not yet been routed. */@NullableRealConnectionget(Addressaddress,StreamAllocationstreamAllocation,Routeroute){assert(Thread.holdsLock(this));for(RealConnectionconnection:connections){if(connection.isEligible(address,route)){streamAllocation.acquire(connection,true);returnconnection;}}returnnull;}
/** * Returns true if this connection can carry a stream allocation to {@code address}. If non-null * {@code route} is the resolved route for a connection. */publicbooleanisEligible(Addressaddress,@NullableRouteroute){// If this connection is not accepting new streams, we're done.if(allocations.size()>=allocationLimit||noNewStreams)returnfalse;// If the non-host fields of the address don't overlap, we're done.if(!Internal.instance.equalsNonHost(this.route.address(),address))returnfalse;// If the host exactly matches, we're done: this connection can carry the address.if(address.url().host().equals(this.route().address().url().host())){returntrue;// This connection is a perfect match.}// At this point we don't have a hostname match. But we still be able to carry the request if// our connection coalescing requirements are met. See also:// https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding// https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/// 1. This connection must be HTTP/2.if(http2Connection==null)returnfalse;// 2. The routes must share an IP address. This requires us to have a DNS address for both// hosts, which only happens after route planning. We can't coalesce connections that use a// proxy, since proxies don't tell us the origin server's IP address.if(route==null)returnfalse;if(route.proxy().type()!=Proxy.Type.DIRECT)returnfalse;if(this.route.proxy().type()!=Proxy.Type.DIRECT)returnfalse;if(!this.route.socketAddress().equals(route.socketAddress()))returnfalse;// 3. This connection's server certificate's must cover the new host.if(route.address().hostnameVerifier()!=OkHostnameVerifier.INSTANCE)returnfalse;if(!supportsUrl(address.url()))returnfalse;// 4. Certificate pinning must match the host.try{address.certificatePinner().check(address.url().host(),handshake().peerCertificates());}catch(SSLPeerUnverifiedExceptione){returnfalse;}returntrue;// The caller's address can be carried by this connection.}
123456789101112
/** * Use this allocation to hold {@code connection}. Each call to this must be paired with a call to * {@link #release} on the same connection. */publicvoidacquire(RealConnectionconnection,booleanreportedAcquired){assert(Thread.holdsLock(connectionPool));if(this.connection!=null)thrownewIllegalStateException();this.connection=connection;this.reportedAcquired=reportedAcquired;connection.allocations.add(newStreamAllocationReference(this,callStackTrace));}
privatefinalRunnablecleanupRunnable=newRunnable(){@Overridepublicvoidrun(){while(true){longwaitNanos=cleanup(System.nanoTime());if(waitNanos==-1)return;if(waitNanos>0){longwaitMillis=waitNanos/1000000L;waitNanos-=(waitMillis*1000000L);synchronized(ConnectionPool.this){try{ConnectionPool.this.wait(waitMillis,(int)waitNanos);}catch(InterruptedExceptionignored){}}}}}};longcleanup(longnow){intinUseConnectionCount=0;intidleConnectionCount=0;RealConnectionlongestIdleConnection=null;longlongestIdleDurationNs=Long.MIN_VALUE;// Find either a connection to evict, or the time that the next eviction is due.synchronized(this){for(Iterator<RealConnection>i=connections.iterator();i.hasNext();){RealConnectionconnection=i.next();// If the connection is in use, keep searching.if(pruneAndGetAllocationCount(connection,now)>0){inUseConnectionCount++;continue;}idleConnectionCount++;// If the connection is ready to be evicted, we're done.longidleDurationNs=now-connection.idleAtNanos;if(idleDurationNs>longestIdleDurationNs){longestIdleDurationNs=idleDurationNs;longestIdleConnection=connection;}}if(longestIdleDurationNs>=this.keepAliveDurationNs||idleConnectionCount>this.maxIdleConnections){// We've found a connection to evict. Remove it from the list, then close it below (outside// of the synchronized block).connections.remove(longestIdleConnection);}elseif(idleConnectionCount>0){// A connection will be ready to evict soon.returnkeepAliveDurationNs-longestIdleDurationNs;}elseif(inUseConnectionCount>0){// All connections are in use. It'll be at least the keep alive duration 'til we run again.returnkeepAliveDurationNs;}else{// No connections, idle or in use.cleanupRunning=false;return-1;}}closeQuietly(longestIdleConnection.socket());// Cleanup again immediately.return0;}/** * Prunes any leaked allocations and then returns the number of remaining live allocations on * {@code connection}. Allocations are leaked if the connection is tracking them but the * application code has abandoned them. Leak detection is imprecise and relies on garbage * collection. */privateintpruneAndGetAllocationCount(RealConnectionconnection,longnow){List<Reference<StreamAllocation>>references=connection.allocations;for(inti=0;i<references.size();){Reference<StreamAllocation>reference=references.get(i);if(reference.get()!=null){i++;continue;}// We've discovered a leaked allocation. This is an application bug.StreamAllocation.StreamAllocationReferencestreamAllocRef=(StreamAllocation.StreamAllocationReference)reference;Stringmessage="A connection to "+connection.route().address().url()+" was leaked. Did you forget to close a response body?";Platform.get().logCloseableLeak(message,streamAllocRef.callStackTrace);references.remove(i);connection.noNewStreams=true;// If this was the last allocation, the connection is eligible for immediate eviction.if(references.isEmpty()){connection.idleAtNanos=now-keepAliveDurationNs;return0;}}returnreferences.size();}publicstaticfinalclassStreamAllocationReferenceextendsWeakReference<StreamAllocation>{/** * Captures the stack trace at the time the Call is executed or enqueued. This is helpful for * identifying the origin of connection leaks. */publicfinalObjectcallStackTrace;StreamAllocationReference(StreamAllocationreferent,ObjectcallStackTrace){super(referent);this.callStackTrace=callStackTrace;}}
@OverridepublicResponseintercept(Chainchain)throwsIOException{RealInterceptorChainrealChain=(RealInterceptorChain)chain;HttpCodechttpCodec=realChain.httpStream();StreamAllocationstreamAllocation=realChain.streamAllocation();RealConnectionconnection=(RealConnection)realChain.connection();Requestrequest=realChain.request();longsentRequestMillis=System.currentTimeMillis();realChain.eventListener().requestHeadersStart(realChain.call());httpCodec.writeRequestHeaders(request);realChain.eventListener().requestHeadersEnd(realChain.call(),request);Response.BuilderresponseBuilder=null;if(HttpMethod.permitsRequestBody(request.method())&&request.body()!=null){// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100// Continue" response before transmitting the request body. If we don't get that, return// what we did get (such as a 4xx response) without ever transmitting the request body.if("100-continue".equalsIgnoreCase(request.header("Expect"))){httpCodec.flushRequest();realChain.eventListener().responseHeadersStart(realChain.call());responseBuilder=httpCodec.readResponseHeaders(true);}if(responseBuilder==null){// Write the request body if the "Expect: 100-continue" expectation was met.realChain.eventListener().requestBodyStart(realChain.call());longcontentLength=request.body().contentLength();CountingSinkrequestBodyOut=newCountingSink(httpCodec.createRequestBody(request,contentLength));BufferedSinkbufferedRequestBody=Okio.buffer(requestBodyOut);request.body().writeTo(bufferedRequestBody);bufferedRequestBody.close();realChain.eventListener().requestBodyEnd(realChain.call(),requestBodyOut.successfulCount);}elseif(!connection.isMultiplexed()){// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection// from being reused. Otherwise we're still obligated to transmit the request body to// leave the connection in a consistent state.streamAllocation.noNewStreams();}}httpCodec.finishRequest();if(responseBuilder==null){realChain.eventListener().responseHeadersStart(realChain.call());responseBuilder=httpCodec.readResponseHeaders(false);}Responseresponse=responseBuilder.request(request).handshake(streamAllocation.connection().handshake()).sentRequestAtMillis(sentRequestMillis).receivedResponseAtMillis(System.currentTimeMillis()).build();intcode=response.code();if(code==100){// server sent a 100-continue even though we did not request one.// try again to read the actual responseresponseBuilder=httpCodec.readResponseHeaders(false);response=responseBuilder.request(request).handshake(streamAllocation.connection().handshake()).sentRequestAtMillis(sentRequestMillis).receivedResponseAtMillis(System.currentTimeMillis()).build();code=response.code();}realChain.eventListener().responseHeadersEnd(realChain.call(),response);if(forWebSocket&&code==101){// Connection is upgrading, but we need to ensure interceptors see a non-null response body.response=response.newBuilder().body(Util.EMPTY_RESPONSE).build();}else{response=response.newBuilder().body(httpCodec.openResponseBody(response)).build();}if("close".equalsIgnoreCase(response.request().header("Connection"))||"close".equalsIgnoreCase(response.header("Connection"))){streamAllocation.noNewStreams();}if((code==204||code==205)&&response.body().contentLength()>0){thrownewProtocolException("HTTP "+code+" had non-zero Content-Length: "+response.body().contentLength());}returnresponse;}