INTERNAL: make piped insert operations process synchronously#795
INTERNAL: make piped insert operations process synchronously#795oliviarla wants to merge 1 commit intonaver:developfrom
Conversation
src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java
Outdated
Show resolved
Hide resolved
src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java
Outdated
Show resolved
Hide resolved
9eb5496 to
076d02b
Compare
90a7ee5 to
df233d9
Compare
600f2b2 to
d232a01
Compare
|
@jhpark816 리뷰 반영했습니다. |
|
@uhm0311 리뷰 바랍니다. |
This comment was marked as resolved.
This comment was marked as resolved.
src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java
Outdated
Show resolved
Hide resolved
src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java
Outdated
Show resolved
Hide resolved
src/main/java/net/spy/memcached/internal/PipedCollectionFuture.java
Outdated
Show resolved
Hide resolved
src/main/java/net/spy/memcached/protocol/ascii/SingleKeyPipeOperationImpl.java
Show resolved
Hide resolved
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
|
@uhm0311 @jhpark816 future.cancel()
public boolean cancel(boolean ign) {
for (int i = currentOpIdx; i < ops.size(); i++) {
if (ops.get(i).cancel("by application.")) {
return true;
}
}
return false;
}
public boolean isCancelled() {
for (int i=0;i<ops.size();i++) {
if (ops.get(i).isCancelled()) {
return true;
}
}
return false;
}callback.complete()
public void complete() {
if (rv.getOperationStatus().isSuccess()) {
Operation nextOp = rv.getNextOp(); // next op가 있다면 op 객체를, 없다면 Null 반환
if (nextOp != null && !nextOp.isCancelled()) {
addOp(key, nextOp);
rv.setCurrentOpIdx(nextOpIdx);
} else {
latch.countDown();
}
} else {
// ...
if (nextIndex > 0) {
rv.addEachResult(nextIndex, new CollectionOperationStatus(false, "NOT_EXECUTED", CollectionResponse.NOT_EXECUTED));
}
latch.countDown();
}
} |
| if (nextIndex > 0) { | ||
| rv.addEachResult(nextIndex, | ||
| new CollectionOperationStatus(false, "NOT_EXECUTED", CollectionResponse.NOT_EXECUTED)); | ||
| } |
There was a problem hiding this comment.
@oliviarla
complete()는 cancel 시에도 호출됩니다.
이 경우, rv.getOperationStatus().isSuccess()는 false이므로, NOT_EXECUTED 상태를 추가하게 되는 데요.
cancel 시에 NOT_EXECUTED 상태를 추가하는 것이 맞는 지 검토 바랍니다.
아래는 참고 바랍니다.
- lastExecutedIndex는 gotStatus() 호출에서 증가하게 되는 데요.
- cancel 시에 현재의 lastExecutedIndex 기존으로 NOT_EXECUTED 추가하였다가
- 나중에 gotStatus() 호출에 의하여 해당 index의 status를 교체할 수 있게 됩니다.
즉, NOT_EXECUTED가 제거될 수 있습니다.
There was a problem hiding this comment.
나중에 gotStatus() 호출에 의하여 해당 index의 status를 교체할 수 있게 됩니다.
즉, NOT_EXECUTED가 제거될 수 있습니다.
이 경우는 발생하지 않을 것 같습니다. gotStatus가 호출되려면 op가 이미 node의 queue에 추가되어 있어야 하는데, NOT_EXECUTED가 추가되는 시점에는 op가 node의 queue에 있을 수 없으므로 gotStatus 역시 호출될 수 없습니다.
There was a problem hiding this comment.
나중에 gotStatus() 호출에 의하여 해당 index의 status를 교체할 수 있게 됩니다.
즉, NOT_EXECUTED가 제거될 수 있습니다.
이 경우는 발생하지 않을 것 같습니다. gotStatus가 호출되려면 op가 이미 node의 queue에 추가되어 있어야 하는데, NOT_EXECUTED가 추가되는 시점에는 op가 node의 queue에 있을 수 없으므로 gotStatus 역시 호출될 수 없습니다.
이 부분에 관한 코멘트합니다.
해당 op가 readQ에 있는 상태에서 cancel 되었다고 가정합니다.
해당 op가 cancel 되더라도 readQ에 있으면서 response 문자열을 읽어 처리하게 되므로
gotStatus() 호출하게 될 것 같습니다. 어떤가요?
따라서, 해당 op 상태에 따라 판단해야 할 것 같습니다. 검토 바랍니다.
- (currentItemIdx + 1) < itemCount
READING상태 : itemCount 만큼 결과가 받을 예정이므로,- (currentItemIdx+1) ~ (ItemCount-1) : UNDEFINED status 추가
- 마지막(index = ItemCount)에 STOPPED status 추가
- 그 외의 상태 : 현재 op는 수행되지 않는다는 것을 알 수 있음
- this.gotStatus(currentItemIdx+1, StoppedStatus);
- (currentItemIdx + 1) >= itemCount
- (opIdx + 1) >= ops.size()
- Don't add a failed Status
- else
- this.gotStatus(currentItemIdx+1, StoppedStatus);
- (opIdx + 1) >= ops.size()
그리고, currentItemIdx => processedItemIdx 용어가 나은 것 같습니다.
There was a problem hiding this comment.
@oliviarla
Stopped status 추가하는 부분은 다시 검토한 후에 코멘트 남기겠습니다.
|
@uhm0311 @jhpark816 |
c833f44 to
0129d4e
Compare
|
리뷰 반영했습니다. |
| rv.addOperation(op); | ||
| } | ||
| addOp(key, rv.getOperation(0)); | ||
| rv.incrCurrentOpIdx(); |
There was a problem hiding this comment.
이전 코멘트를 가정하고 아래와 같이 호출할 수 있습니다.
addOp(key, rv.GetFirstOp());| } catch (IndexOutOfBoundsException e) { | ||
| return null; | ||
| } | ||
| } |
There was a problem hiding this comment.
아래 예시의 getFirstOp(), getNextOp()가 있으면, 다음 method는 제거할 수 있습니다.
- Operation getOperation(int idx)
- void incrCurrentOpIdx()
public Operation getFirstOp() {
currentOpIdx = 0;
return ops.get(currentOpIdx);
}
public Operation getNextOp() {
if (isCancelled() || (currentOpIdx + 1) >= ops.size()) {
return null;
}
currentOpIdx += 1;
return ops.get(currentOpIdx);
}추가 코멘트로,
- getFirstOp()도 제거할 수 있는 데요.
getFirstOp()와 getNextOp()를 모두 사용하는 것이 나은 지 아니면
getNextOp() 하나만 사용하는 것이 나은 지를 검토 바랍니다. - Future에서 isCancelled() 수행 cost가 적게 구현할 필요가 있습니다.
| Operation nextOp = rv.getNextOp(); | ||
| if (nextOp != null && !nextOp.isCancelled()) { | ||
| addOp(key, nextOp); | ||
| rv.incrCurrentOpIdx(); |
There was a problem hiding this comment.
이전 코멘트를 가정하면, 위의 rv.incrCurrentOpIdx(); 라인을 제거할 수 있습니다.
| } | ||
| } | ||
| return rv; | ||
| return false; |
There was a problem hiding this comment.
cancel() 함수를 엄밀하게 살펴보니,
cancel() 2회 호출 시에 기존과 비교하여 아래의 차이가 있습니다.
- 기존) 2번째 호출에서 항상 false가 리턴됩니다.
- 변경) 2번째 호출에서 다시 next op에 대해 cancel() 수행되고 true 리턴할 수 있습니다.
아래와 같이 cancelled 변수를 두는 구현이 명확하고 나은 것 같습니다.
public boolean cancel(boolean ign) {
if (cancelled) {
return false;
}
for (int i = currentOpIdx; i < ops.size(); i++) {
cancelled = ops.get(i).cancel("by application.");
if (cancelled) {
return true;
}
}
return false;
}
public boolean isCancelled() {
return cancelled;
}| } else { | ||
| // If this operation has been errored or cancelled, some items in the operation may not be executed. | ||
| // The first item that is not executed will have the STOPPED flag. | ||
| if ((currentItemIdx + 1) < itemCount || (opIdx + 1 < insertList.size())) { |
There was a problem hiding this comment.
(currentItemIdx + 1) < itemCount 와 일관된 아래 형태로 하시죠.
- (opIdx + 1 < insertList.size()) => (opIdx + 1) < insertList.size()
| if (nextIndex > 0) { | ||
| rv.addEachResult(nextIndex, | ||
| new CollectionOperationStatus(false, "NOT_EXECUTED", CollectionResponse.NOT_EXECUTED)); | ||
| } |
There was a problem hiding this comment.
나중에 gotStatus() 호출에 의하여 해당 index의 status를 교체할 수 있게 됩니다.
즉, NOT_EXECUTED가 제거될 수 있습니다.
이 경우는 발생하지 않을 것 같습니다. gotStatus가 호출되려면 op가 이미 node의 queue에 추가되어 있어야 하는데, NOT_EXECUTED가 추가되는 시점에는 op가 node의 queue에 있을 수 없으므로 gotStatus 역시 호출될 수 없습니다.
이 부분에 관한 코멘트합니다.
해당 op가 readQ에 있는 상태에서 cancel 되었다고 가정합니다.
해당 op가 cancel 되더라도 readQ에 있으면서 response 문자열을 읽어 처리하게 되므로
gotStatus() 호출하게 될 것 같습니다. 어떤가요?
따라서, 해당 op 상태에 따라 판단해야 할 것 같습니다. 검토 바랍니다.
- (currentItemIdx + 1) < itemCount
READING상태 : itemCount 만큼 결과가 받을 예정이므로,- (currentItemIdx+1) ~ (ItemCount-1) : UNDEFINED status 추가
- 마지막(index = ItemCount)에 STOPPED status 추가
- 그 외의 상태 : 현재 op는 수행되지 않는다는 것을 알 수 있음
- this.gotStatus(currentItemIdx+1, StoppedStatus);
- (currentItemIdx + 1) >= itemCount
- (opIdx + 1) >= ops.size()
- Don't add a failed Status
- else
- this.gotStatus(currentItemIdx+1, StoppedStatus);
- (opIdx + 1) >= ops.size()
그리고, currentItemIdx => processedItemIdx 용어가 나은 것 같습니다.
| @Override | ||
| protected OperationStatus checkStatus(String line) { | ||
| return null; | ||
| } |
There was a problem hiding this comment.
checkStatus() 관한 질문입니다.
추상 메소드를 첫번째의 상속 클래스(SingleKeyPipeOperationImpl)에서 구현하지 않고,
SingleKeyPipeOperationImpl를 상속한 클래스에서 구현하는 것은 가능한가요?
| private final List<Operation> ops = new ArrayList<>(); | ||
| private final AtomicReference<CollectionOperationStatus> operationStatus | ||
| = new AtomicReference<>(null); | ||
| private int currentOpIdx = -1; |
There was a problem hiding this comment.
이 값이 -1인 상태에서 일부 함수에서 for 루프를 돌면 IndexOutOfBoundsException이 발생하게 됩니다.
이에 관한 적절한 처리가 필요해 보입니다.
|
@uhm0311 @jhpark816 따라서 Future에 해당 값들을 두지 않고, 기존에 수행했던대로 Future에는 현재 실행중이거나 실행 완료된 Operation만을 담는 List를 갖도록 변경하는 PR을 다시 올리려고 합니다. 기존에서 현 상태로 구조가 바뀌게 된 원인이 future.cancel() 호출 시 nextOp까지 cancel() 해보지 않고 실패 처리하여, nextOp에 쉽게 접근해 cancel() 호출을 하기 위함이었습니다. 하지만 이 문제는 Future에서 직접 nextOp에 접근하는 방식 대신, 하나의 Operation에 next Operation 변수를 담아 Operation에 cancel 요청이 들어왔을 때 next Operation cancel()을 내부적으로 호출할 수 있습니다. 구현은 완료된 상태이고, 자체 리뷰 거쳐 내일 다시 PR 올리겠습니다. |
🔗 Related Issue
⌨️ What I did
lop/sop/mop/bop piped insert에 한해 적용하는 PR입니다.동작 과정
500개 아이템 단위로 Operation 객체를 나누어 비동기로 일제히 요청을 보내던 것을 동기 방식으로 하나씩 보내어 Arcus 서버에 과도한 부하가 들어가는 것을 방지하고, 실패 시 다음 Operation을 수행하지 않도록 합니다.
이전 operation이 성공해야만 다음 operation이 writeQueue와 Future의 operation list에 추가됩니다.
만약 CLIENT_ERROR, SERVER_ERROR 실패가 발생하면 그 즉시 latch.countdown을 호출해 남은 operation을 수행하지 않습니다.
OVERFLOWED, OUT_OF_RANGE 같은 실패가 발생하면 이후 operation의 첫 command가 NOT_EXECUTED 상태임을 future의 failedResult에 추가합니다.
NOT_EXECUTED 이후의 모든 연산은 실행되지 않은 것입니다. (CANCELED가 아닌 NOT_EXECUTED를 사용하는 이유는 진짜 cancel 상황과의 혼용을 막고자 하기 위함입니다.)
본 PR 이전에 pipe 관련 클래스 구조 변경이 있었고, 이에 맞추어 SingleKeyPipedOperationImpl 클래스를 추가합니다.