Skip to content

Commit

Permalink
Fix stream
Browse files Browse the repository at this point in the history
  • Loading branch information
merrymercy committed Dec 8, 2024
1 parent cc85895 commit 643746a
Showing 1 changed file with 28 additions and 31 deletions.
59 changes: 28 additions & 31 deletions python/sglang/srt/managers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1194,22 +1194,16 @@ def stream_output(self, reqs: List[Req], skip_req: Optional[Req] = None):
output_skip_special_tokens = []
output_spaces_between_special_tokens = []
output_no_stop_trim = []
else: # embedding or reward model
output_embeddings = []

is_stream_iter = self.forward_ct_decode % self.stream_interval == 0

for req in reqs:
if req is skip_req:
continue
for req in reqs:
if req is skip_req:
continue

# TODO(lianmin): revisit this for overlap + retract + stream
if req.finished() or (
req.stream and (is_stream_iter or len(req.output_ids) == 1)
):
output_rids.append(req.rid)
output_finished_reason.append(req.finished_reason)
if self.is_generation:
# TODO(lianmin): revisit this for overlap + retract + stream
is_stream_iter = len(req.output_ids) % self.stream_interval == 0
if req.finished() or (req.stream and is_stream_iter):
output_rids.append(req.rid)
output_finished_reason.append(req.finished_reason)
output_vids.append(req.vid)
decoded_texts.append(req.decoded_text)
read_ids, read_offset = req.init_incremental_detokenize()
Expand Down Expand Up @@ -1251,16 +1245,9 @@ def stream_output(self, reqs: List[Req], skip_req: Optional[Req] = None):
req.normalized_prompt_logprob,
)
output_meta_info.append(meta_info)
else: # embedding or reward model
output_embeddings.append(req.embedding)
meta_info = {
"prompt_tokens": len(req.origin_input_ids),
}
output_meta_info.append(meta_info)

# Send to detokenizer
if output_rids:
if self.is_generation:
# Send to detokenizer
if output_rids:
self.send_to_detokenizer.send_pyobj(
BatchTokenIDOut(
output_rids,
Expand All @@ -1276,15 +1263,25 @@ def stream_output(self, reqs: List[Req], skip_req: Optional[Req] = None):
output_no_stop_trim,
)
)
else: # embedding or reward model
self.send_to_detokenizer.send_pyobj(
BatchEmbeddingOut(
output_rids,
output_embeddings,
output_meta_info,
output_finished_reason,
)
else: # embedding or reward model
output_embeddings = []
for req in reqs:
assert req.finished()
output_rids.append(req.rid)
output_finished_reason.append(req.finished_reason)
output_embeddings.append(req.embedding)
meta_info = {
"prompt_tokens": len(req.origin_input_ids),
}
output_meta_info.append(meta_info)
self.send_to_detokenizer.send_pyobj(
BatchEmbeddingOut(
output_rids,
output_embeddings,
output_meta_info,
output_finished_reason,
)
)

def prepare_dp_attn_batch(self, local_batch: ScheduleBatch):
# Check if other DP workers have running batches
Expand Down

0 comments on commit 643746a

Please sign in to comment.