3131 * Memcached node for the ASCII protocol.
3232 */
3333public final class AsciiMemcachedNodeImpl extends TCPMemcachedNodeImpl {
34+
35+ private static final int GET_BULK_CHUNK_SIZE = 200 ;
36+
3437 public AsciiMemcachedNodeImpl (String name ,
3538 SocketAddress sa ,
3639 int bufSize , BlockingQueue <Operation > rq ,
@@ -42,34 +45,39 @@ public AsciiMemcachedNodeImpl(String name,
4245
4346 @ Override
4447 protected void optimize () {
45- // make sure there are at least two get operations in a row before
46- // attempting to optimize them.
4748 Operation nxtOp = writeQ .peek ();
48- if (nxtOp instanceof GetOperation && nxtOp .getAPIType () != APIType .MGET ) {
49- optimizedOp = writeQ .remove ();
50- nxtOp = writeQ .peek ();
51- if (nxtOp instanceof GetOperation && nxtOp .getAPIType () != APIType .MGET ) {
52- OptimizedGetImpl og = new OptimizedGetImpl (
53- (GetOperation ) optimizedOp );
54- optimizedOp = og ;
49+ if (!(nxtOp instanceof GetOperation ) || nxtOp .getAPIType () == APIType .MGET ||
50+ ((GetOperation ) nxtOp ).getKeys ().size () > GET_BULK_CHUNK_SIZE ) {
51+ return ;
52+ }
5553
56- do {
57- GetOperationImpl o = (GetOperationImpl ) writeQ .remove ();
58- if (!o .isCancelled ()) {
59- og .addOperation (o );
60- }
61- nxtOp = writeQ .peek ();
62- } while (nxtOp instanceof GetOperation &&
63- nxtOp .getAPIType () != APIType .MGET );
54+ int cnt = ((GetOperation ) nxtOp ).getKeys ().size ();
55+ optimizedOp = writeQ .remove ();
56+ nxtOp = writeQ .peek ();
57+ OptimizedGetImpl og = null ;
6458
65- // Initialize the new mega get
66- optimizedOp .initialize ();
67- assert optimizedOp .getState () == OperationState .WRITE_QUEUED ;
68- ProxyCallback pcb = (ProxyCallback ) og .getCallback ();
69- getLogger ().debug ("Set up %s with %s keys and %s callbacks" ,
70- this , pcb .numKeys (), pcb .numCallbacks ());
59+ while (nxtOp instanceof GetOperation && nxtOp .getAPIType () != APIType .MGET ) {
60+ if (og == null ) {
61+ og = (OptimizedGetImpl ) optimizedOp ;
62+ assert og != null ;
63+ }
64+ cnt += ((GetOperation ) nxtOp ).getKeys ().size ();
65+ if (cnt > GET_BULK_CHUNK_SIZE ) {
66+ break ;
7167 }
68+ GetOperationImpl currentOp = (GetOperationImpl ) writeQ .remove ();
69+ if (!currentOp .isCancelled ()) {
70+ og .addOperation (currentOp );
71+ }
72+ nxtOp = writeQ .peek ();
73+ }
74+ // Initialize the new mega get
75+ if (og != null ) {
76+ optimizedOp .initialize ();
77+ assert optimizedOp .getState () == OperationState .WRITE_QUEUED ;
78+ ProxyCallback pcb = (ProxyCallback ) optimizedOp .getCallback ();
79+ getLogger ().debug ("Set up %s with %s keys and %s callbacks" ,
80+ this , pcb .numKeys (), pcb .numCallbacks ());
7281 }
7382 }
74-
7583}
0 commit comments