Skip to content

Comments

INTERNAL: make piped insert operations process synchronously#795

Closed
oliviarla wants to merge 1 commit intonaver:developfrom
oliviarla:pipe2
Closed

INTERNAL: make piped insert operations process synchronously#795
oliviarla wants to merge 1 commit intonaver:developfrom
oliviarla:pipe2

Conversation

@oliviarla
Copy link
Collaborator

@oliviarla oliviarla commented Aug 19, 2024

🔗 Related Issue

⌨️ What I did

  • lop/sop/mop/bop piped insert에 한해 적용하는 PR입니다.

동작 과정

  • 500개 아이템 단위로 Operation 객체를 나누어 비동기로 일제히 요청을 보내던 것을 동기 방식으로 하나씩 보내어 Arcus 서버에 과도한 부하가 들어가는 것을 방지하고, 실패 시 다음 Operation을 수행하지 않도록 합니다.

  • 이전 operation이 성공해야만 다음 operation이 writeQueue와 Future의 operation list에 추가됩니다.

  • 만약 CLIENT_ERROR, SERVER_ERROR 실패가 발생하면 그 즉시 latch.countdown을 호출해 남은 operation을 수행하지 않습니다.

  • OVERFLOWED, OUT_OF_RANGE 같은 실패가 발생하면 이후 operation의 첫 command가 NOT_EXECUTED 상태임을 future의 failedResult에 추가합니다.

  • NOT_EXECUTED 이후의 모든 연산은 실행되지 않은 것입니다. (CANCELED가 아닌 NOT_EXECUTED를 사용하는 이유는 진짜 cancel 상황과의 혼용을 막고자 하기 위함입니다.)

  • 본 PR 이전에 pipe 관련 클래스 구조 변경이 있었고, 이에 맞추어 SingleKeyPipedOperationImpl 클래스를 추가합니다.

    • PipedOperationImpl 자체를 수정할 경우 bulk insert까지 영향이 가기 때문에, single key 전용 operation 클래스를 추가한 후 handleLine 메서드를 수정했습니다.
    • 이 클래스를 CollectionPipedInsertOperationImpl 가 상속받도록 합니다. (추후 CollectionPipedUpdateOperationImpl 등 클래스들도 상속받을 예정입니다.)
    • PipedOperationImpl 클래스는 파이프를 사용하는 모든 Operation에 대한 로직을 관리하고, single key나 multi key에 대한 처리를 이 클래스와 같은 별도 클래스에 두어 분리된 형태로 관리할 수 있습니다.

@oliviarla oliviarla requested review from jhpark816 and uhm0311 August 19, 2024 06:37
oliviarla referenced this pull request in oliviarla/arcus-java-client Aug 19, 2024
@jhpark816 jhpark816 removed their request for review August 19, 2024 06:42
@oliviarla oliviarla requested review from jhpark816 and removed request for jhpark816 August 19, 2024 06:43
@oliviarla oliviarla self-assigned this Aug 19, 2024
@oliviarla oliviarla force-pushed the pipe2 branch 3 times, most recently from 9eb5496 to 076d02b Compare August 23, 2024 11:35
uhm0311
uhm0311 previously approved these changes Aug 27, 2024
@oliviarla oliviarla requested a review from jhpark816 August 28, 2024 02:43
Copy link
Collaborator

@jhpark816 jhpark816 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

일부 리뷰

Copy link
Collaborator

@jhpark816 jhpark816 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

리뷰 완료

@oliviarla oliviarla force-pushed the pipe2 branch 4 times, most recently from 600f2b2 to d232a01 Compare September 6, 2024 09:15
@oliviarla
Copy link
Collaborator Author

@jhpark816 리뷰 반영했습니다.

@jhpark816
Copy link
Collaborator

@uhm0311 리뷰 바랍니다.

@uhm0311

This comment was marked as resolved.

Copy link
Collaborator

@jhpark816 jhpark816 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

일부 리뷰

@oliviarla

This comment was marked as outdated.

@uhm0311

This comment was marked as outdated.

@oliviarla

This comment was marked as outdated.

@uhm0311

This comment was marked as outdated.

@oliviarla

This comment was marked as outdated.

@oliviarla
Copy link
Collaborator Author

oliviarla commented Feb 27, 2025

@uhm0311 @jhpark816
이전 코멘트가 너무 복잡한 방향으로 진행되어 @jhpark816 님의 피드백을 받고 아래와 같이 구상을 변경하였습니다.

future.cancel()

  • op.cancel이 실패하는 경우는 이미 op가 complete 처리되고 있는 경우 뿐이다. 따라서 complete 처리된 op는 스킵하고 다음 op를 cancel 시도한다.
  • 현재 수행중인 op부터 순회하며 cancel이 성공하였다면 즉시 성공 응답을 반환하도록 한다.
  • cancel된 op가 하나라도 있다면 future는 cancel된 상태이다.
public boolean cancel(boolean ign) {
  for (int i = currentOpIdx; i < ops.size(); i++) {
    if (ops.get(i).cancel("by application.")) {
      return true;
    }
  }
  return false;
}

public boolean isCancelled() {
  for (int i=0;i<ops.size();i++) {
    if (ops.get(i).isCancelled()) {
      return true;
    }
  }
  return false;
}

callback.complete()

  • nextOp를 cancel 시키는 동시에 addOp하는 경우 cancel 상태이면서 input queue에 추가될 수 있으나, write queue에서 제외되므로 문제가 발생하지 않는다.
public void complete() {
  if (rv.getOperationStatus().isSuccess()) {
    Operation nextOp = rv.getNextOp(); // next op가 있다면 op 객체를, 없다면 Null 반환
    if (nextOp != null && !nextOp.isCancelled()) {
      addOp(key, nextOp);
      rv.setCurrentOpIdx(nextOpIdx);
    } else {
      latch.countDown();
    }
  } else {
    // ...
    if (nextIndex > 0) {
      rv.addEachResult(nextIndex, new CollectionOperationStatus(false, "NOT_EXECUTED", CollectionResponse.NOT_EXECUTED));
    }
    latch.countDown();
  }
}

if (nextIndex > 0) {
rv.addEachResult(nextIndex,
new CollectionOperationStatus(false, "NOT_EXECUTED", CollectionResponse.NOT_EXECUTED));
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oliviarla
complete()는 cancel 시에도 호출됩니다.
이 경우, rv.getOperationStatus().isSuccess()는 false이므로, NOT_EXECUTED 상태를 추가하게 되는 데요.
cancel 시에 NOT_EXECUTED 상태를 추가하는 것이 맞는 지 검토 바랍니다.
아래는 참고 바랍니다.

  • lastExecutedIndex는 gotStatus() 호출에서 증가하게 되는 데요.
  • cancel 시에 현재의 lastExecutedIndex 기존으로 NOT_EXECUTED 추가하였다가
  • 나중에 gotStatus() 호출에 의하여 해당 index의 status를 교체할 수 있게 됩니다.
    즉, NOT_EXECUTED가 제거될 수 있습니다.

Copy link
Collaborator Author

@oliviarla oliviarla Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

나중에 gotStatus() 호출에 의하여 해당 index의 status를 교체할 수 있게 됩니다.
즉, NOT_EXECUTED가 제거될 수 있습니다.

이 경우는 발생하지 않을 것 같습니다. gotStatus가 호출되려면 op가 이미 node의 queue에 추가되어 있어야 하는데, NOT_EXECUTED가 추가되는 시점에는 op가 node의 queue에 있을 수 없으므로 gotStatus 역시 호출될 수 없습니다.

Copy link
Collaborator

@jhpark816 jhpark816 Mar 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

나중에 gotStatus() 호출에 의하여 해당 index의 status를 교체할 수 있게 됩니다.
즉, NOT_EXECUTED가 제거될 수 있습니다.

이 경우는 발생하지 않을 것 같습니다. gotStatus가 호출되려면 op가 이미 node의 queue에 추가되어 있어야 하는데, NOT_EXECUTED가 추가되는 시점에는 op가 node의 queue에 있을 수 없으므로 gotStatus 역시 호출될 수 없습니다.

이 부분에 관한 코멘트합니다.

해당 op가 readQ에 있는 상태에서 cancel 되었다고 가정합니다.
해당 op가 cancel 되더라도 readQ에 있으면서 response 문자열을 읽어 처리하게 되므로
gotStatus() 호출하게 될 것 같습니다. 어떤가요?

따라서, 해당 op 상태에 따라 판단해야 할 것 같습니다. 검토 바랍니다.

  • (currentItemIdx + 1) < itemCount
    • READING 상태 : itemCount 만큼 결과가 받을 예정이므로,
      • (currentItemIdx+1) ~ (ItemCount-1) : UNDEFINED status 추가
      • 마지막(index = ItemCount)에 STOPPED status 추가
    • 그 외의 상태 : 현재 op는 수행되지 않는다는 것을 알 수 있음
      • this.gotStatus(currentItemIdx+1, StoppedStatus);
  • (currentItemIdx + 1) >= itemCount
    • (opIdx + 1) >= ops.size()
      • Don't add a failed Status
    • else
      • this.gotStatus(currentItemIdx+1, StoppedStatus);

그리고, currentItemIdx => processedItemIdx 용어가 나은 것 같습니다.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oliviarla
Stopped status 추가하는 부분은 다시 검토한 후에 코멘트 남기겠습니다.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oliviarla
위의 코멘트 수정하였으니, 다시 확인 바랍니다.

@oliviarla
Copy link
Collaborator Author

@uhm0311 @jhpark816
이제부터 다시 리뷰해주시면 됩니다.

@oliviarla oliviarla force-pushed the pipe2 branch 2 times, most recently from c833f44 to 0129d4e Compare February 28, 2025 11:45
Copy link
Collaborator

@jhpark816 jhpark816 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

리뷰 완료

@oliviarla
Copy link
Collaborator Author

리뷰 반영했습니다.

Copy link
Collaborator

@jhpark816 jhpark816 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

리뷰 완료

rv.addOperation(op);
}
addOp(key, rv.getOperation(0));
rv.incrCurrentOpIdx();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

이전 코멘트를 가정하고 아래와 같이 호출할 수 있습니다.

addOp(key, rv.GetFirstOp());

} catch (IndexOutOfBoundsException e) {
return null;
}
}
Copy link
Collaborator

@jhpark816 jhpark816 Mar 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

아래 예시의 getFirstOp(), getNextOp()가 있으면, 다음 method는 제거할 수 있습니다.

  • Operation getOperation(int idx)
  • void incrCurrentOpIdx()
  public Operation getFirstOp() {
    currentOpIdx = 0;
    return ops.get(currentOpIdx);
  }

  public Operation getNextOp() {
    if (isCancelled() || (currentOpIdx + 1) >= ops.size()) {
      return null;
    }
    currentOpIdx += 1;
    return ops.get(currentOpIdx);
  }

추가 코멘트로,

  • getFirstOp()도 제거할 수 있는 데요.
    getFirstOp()와 getNextOp()를 모두 사용하는 것이 나은 지 아니면
    getNextOp() 하나만 사용하는 것이 나은 지를 검토 바랍니다.
  • Future에서 isCancelled() 수행 cost가 적게 구현할 필요가 있습니다.

Operation nextOp = rv.getNextOp();
if (nextOp != null && !nextOp.isCancelled()) {
addOp(key, nextOp);
rv.incrCurrentOpIdx();
Copy link
Collaborator

@jhpark816 jhpark816 Mar 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

이전 코멘트를 가정하면, 위의 rv.incrCurrentOpIdx(); 라인을 제거할 수 있습니다.

}
}
return rv;
return false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cancel() 함수를 엄밀하게 살펴보니,
cancel() 2회 호출 시에 기존과 비교하여 아래의 차이가 있습니다.

  • 기존) 2번째 호출에서 항상 false가 리턴됩니다.
  • 변경) 2번째 호출에서 다시 next op에 대해 cancel() 수행되고 true 리턴할 수 있습니다.

아래와 같이 cancelled 변수를 두는 구현이 명확하고 나은 것 같습니다.

  public boolean cancel(boolean ign) {
    if (cancelled) {
      return false;
    }
    for (int i = currentOpIdx; i < ops.size(); i++) {
      cancelled = ops.get(i).cancel("by application.");
      if (cancelled) {
        return true;
      }
    }
    return false;
  }

  public boolean isCancelled() {
    return cancelled;
  }

} else {
// If this operation has been errored or cancelled, some items in the operation may not be executed.
// The first item that is not executed will have the STOPPED flag.
if ((currentItemIdx + 1) < itemCount || (opIdx + 1 < insertList.size())) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(currentItemIdx + 1) < itemCount 와 일관된 아래 형태로 하시죠.

  • (opIdx + 1 < insertList.size()) => (opIdx + 1) < insertList.size()

if (nextIndex > 0) {
rv.addEachResult(nextIndex,
new CollectionOperationStatus(false, "NOT_EXECUTED", CollectionResponse.NOT_EXECUTED));
}
Copy link
Collaborator

@jhpark816 jhpark816 Mar 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

나중에 gotStatus() 호출에 의하여 해당 index의 status를 교체할 수 있게 됩니다.
즉, NOT_EXECUTED가 제거될 수 있습니다.

이 경우는 발생하지 않을 것 같습니다. gotStatus가 호출되려면 op가 이미 node의 queue에 추가되어 있어야 하는데, NOT_EXECUTED가 추가되는 시점에는 op가 node의 queue에 있을 수 없으므로 gotStatus 역시 호출될 수 없습니다.

이 부분에 관한 코멘트합니다.

해당 op가 readQ에 있는 상태에서 cancel 되었다고 가정합니다.
해당 op가 cancel 되더라도 readQ에 있으면서 response 문자열을 읽어 처리하게 되므로
gotStatus() 호출하게 될 것 같습니다. 어떤가요?

따라서, 해당 op 상태에 따라 판단해야 할 것 같습니다. 검토 바랍니다.

  • (currentItemIdx + 1) < itemCount
    • READING 상태 : itemCount 만큼 결과가 받을 예정이므로,
      • (currentItemIdx+1) ~ (ItemCount-1) : UNDEFINED status 추가
      • 마지막(index = ItemCount)에 STOPPED status 추가
    • 그 외의 상태 : 현재 op는 수행되지 않는다는 것을 알 수 있음
      • this.gotStatus(currentItemIdx+1, StoppedStatus);
  • (currentItemIdx + 1) >= itemCount
    • (opIdx + 1) >= ops.size()
      • Don't add a failed Status
    • else
      • this.gotStatus(currentItemIdx+1, StoppedStatus);

그리고, currentItemIdx => processedItemIdx 용어가 나은 것 같습니다.

@Override
protected OperationStatus checkStatus(String line) {
return null;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkStatus() 관한 질문입니다.
추상 메소드를 첫번째의 상속 클래스(SingleKeyPipeOperationImpl)에서 구현하지 않고,
SingleKeyPipeOperationImpl를 상속한 클래스에서 구현하는 것은 가능한가요?

private final List<Operation> ops = new ArrayList<>();
private final AtomicReference<CollectionOperationStatus> operationStatus
= new AtomicReference<>(null);
private int currentOpIdx = -1;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

이 값이 -1인 상태에서 일부 함수에서 for 루프를 돌면 IndexOutOfBoundsException이 발생하게 됩니다.
이에 관한 적절한 처리가 필요해 보입니다.

@oliviarla
Copy link
Collaborator Author

oliviarla commented Mar 13, 2025

@uhm0311 @jhpark816
현재 Future에 currentOpIdx, 모든 Operation을 담은 List 를 두다보니 해당 값을 callback에서도 사용하고 future에서도 사용하여, 동시성 문제가 발생할 여지가 높아져있습니다. 또한 Future는 값을 기다리는 역할만을 갖는 것이 단순하고 추후 Future를 통합할 때도 유용할 것이라는 생각이 들었습니다.

따라서 Future에 해당 값들을 두지 않고, 기존에 수행했던대로 Future에는 현재 실행중이거나 실행 완료된 Operation만을 담는 List를 갖도록 변경하는 PR을 다시 올리려고 합니다.

기존에서 현 상태로 구조가 바뀌게 된 원인이 future.cancel() 호출 시 nextOp까지 cancel() 해보지 않고 실패 처리하여, nextOp에 쉽게 접근해 cancel() 호출을 하기 위함이었습니다.

하지만 이 문제는 Future에서 직접 nextOp에 접근하는 방식 대신, 하나의 Operation에 next Operation 변수를 담아 Operation에 cancel 요청이 들어왔을 때 next Operation cancel()을 내부적으로 호출할 수 있습니다. 구현은 완료된 상태이고, 자체 리뷰 거쳐 내일 다시 PR 올리겠습니다.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants