3131 * Memcached node for the ASCII protocol.
3232 */
3333public final class AsciiMemcachedNodeImpl extends TCPMemcachedNodeImpl {
34+
35+ private static final int GET_BULK_CHUNK_SIZE = 200 ;
36+
3437 public AsciiMemcachedNodeImpl (String name ,
3538 SocketAddress sa ,
3639 int bufSize , BlockingQueue <Operation > rq ,
@@ -45,30 +48,42 @@ protected void optimize() {
4548 // make sure there are at least two get operations in a row before
4649 // attempting to optimize them.
4750 Operation nxtOp = writeQ .peek ();
48- if (nxtOp instanceof GetOperation && nxtOp .getAPIType () != APIType .MGET ) {
49- optimizedOp = writeQ .remove ();
50- nxtOp = writeQ .peek ();
51- if (nxtOp instanceof GetOperation && nxtOp .getAPIType () != APIType .MGET ) {
52- OptimizedGetImpl og = new OptimizedGetImpl (
53- (GetOperation ) optimizedOp );
54- optimizedOp = og ;
51+ if (!(nxtOp instanceof GetOperation ) || nxtOp .getAPIType () != APIType .MGET ) {
52+ return ;
53+ }
54+
55+ int cnt = ((GetOperation ) nxtOp ).getKeys ().size ();
56+ if (cnt > GET_BULK_CHUNK_SIZE ) {
57+ return ;
58+ }
5559
56- do {
57- GetOperationImpl o = (GetOperationImpl ) writeQ .remove ();
58- if (!o .isCancelled ()) {
59- og .addOperation (o );
60- }
61- nxtOp = writeQ .peek ();
62- } while (nxtOp instanceof GetOperation &&
63- nxtOp .getAPIType () != APIType .MGET );
60+ optimizedOp = writeQ .remove ();
61+ nxtOp = writeQ .peek ();
62+ OptimizedGetImpl og = null ;
6463
65- // Initialize the new mega get
66- optimizedOp .initialize ();
67- assert optimizedOp .getState () == OperationState .WRITE_QUEUED ;
68- ProxyCallback pcb = (ProxyCallback ) og .getCallback ();
69- getLogger ().debug ("Set up %s with %s keys and %s callbacks" ,
70- this , pcb .numKeys (), pcb .numCallbacks ());
64+ while (nxtOp instanceof GetOperation && nxtOp .getAPIType () != APIType .MGET ) {
65+ if (og == null ) {
66+ og = new OptimizedGetImpl ((GetOperation ) optimizedOp );
67+ optimizedOp = og ;
68+ }
69+ GetOperationImpl currentOp = (GetOperationImpl ) writeQ .remove ();
70+ cnt += currentOp .getKeys ().size ();
71+ if (cnt > GET_BULK_CHUNK_SIZE ) {
72+ break ;
73+ }
74+ if (!currentOp .isCancelled ()) {
75+ og .addOperation (currentOp );
7176 }
77+ nxtOp = writeQ .peek ();
78+ }
79+
80+ if (og != null ) {
81+ // Initialize the new mega get
82+ optimizedOp .initialize ();
83+ assert optimizedOp .getState () == OperationState .WRITE_QUEUED ;
84+ ProxyCallback pcb = (ProxyCallback ) og .getCallback ();
85+ getLogger ().debug ("Set up %s with %s keys and %s callbacks" ,
86+ this , pcb .numKeys (), pcb .numCallbacks ());
7287 }
7388 }
7489
0 commit comments