response_tokenizer: refactor
This commit is contained in:
@@ -69,105 +69,97 @@ impl<'a> Iterator for ResponseAttributes<'a> {
|
||||
type Item = Result<(&'a str, GenericResponseValue<'a>), ResponseParserError>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if self.cursor >= self.bytestring.len() {
|
||||
return Some(Err(ResponseParserError::UnexpectedEOF));
|
||||
}
|
||||
if self.bytestring[self.cursor..].starts_with(b"OK") {
|
||||
return None;
|
||||
}
|
||||
|
||||
let remaining = &self.bytestring[self.cursor..];
|
||||
let newline_pos = remaining
|
||||
.iter()
|
||||
.position(|&b| b == b'\n')
|
||||
.unwrap_or(remaining.len());
|
||||
let line = &remaining[..newline_pos];
|
||||
|
||||
// Skip empty lines
|
||||
if line.is_empty() {
|
||||
self.cursor += newline_pos + 1;
|
||||
return self.next();
|
||||
}
|
||||
|
||||
// NOTE: it is important that this happens before any None returns,
|
||||
// so that the iterator advances despite errors.
|
||||
self.cursor += newline_pos + 1;
|
||||
|
||||
let mut keyval = line.splitn(2, |&b| b == b':');
|
||||
let key_bytes = keyval.next()?;
|
||||
|
||||
// TODO: should this be a proper runtime error?
|
||||
debug_assert!(!key_bytes.is_empty());
|
||||
debug_assert!(std::str::from_utf8(key_bytes).is_ok());
|
||||
let key = std::str::from_utf8(key_bytes).ok()?;
|
||||
|
||||
// In the case of binary data, the following value will be the byte count
|
||||
// in decimal, and the actual binary data will follow in the next N bytes,
|
||||
// followed by a newline.
|
||||
//
|
||||
// We parse the number and assign the binary data to the "binary" key.
|
||||
if key == "binary" {
|
||||
let byte_count = match keyval.next() {
|
||||
Some(count) => count.trim_ascii_start(),
|
||||
None => {
|
||||
// TODO: throw more specific error
|
||||
return Some(Err(ResponseParserError::UnexpectedEOF));
|
||||
}
|
||||
};
|
||||
let byte_count_str = match std::str::from_utf8(byte_count) {
|
||||
Ok(s) => s,
|
||||
Err(_) => {
|
||||
// TODO: throw more specific error
|
||||
return Some(Err(ResponseParserError::SyntaxError(
|
||||
0,
|
||||
"Invalid byte count".to_string(),
|
||||
)));
|
||||
}
|
||||
};
|
||||
let byte_count: usize = match byte_count_str.parse() {
|
||||
Ok(n) => n,
|
||||
Err(_) => {
|
||||
// TODO: throw more specific error
|
||||
return Some(Err(ResponseParserError::SyntaxError(
|
||||
0,
|
||||
"Invalid byte count".to_string(),
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
let value_start = self.cursor;
|
||||
let value_end = self.cursor + byte_count;
|
||||
if value_end > self.bytestring.len() {
|
||||
loop {
|
||||
if self.cursor >= self.bytestring.len() {
|
||||
return Some(Err(ResponseParserError::UnexpectedEOF));
|
||||
}
|
||||
let value_bytes = &self.bytestring[value_start..value_end];
|
||||
|
||||
debug_assert!(
|
||||
self.bytestring[value_end..]
|
||||
.iter()
|
||||
.next()
|
||||
.is_none_or(|&b| b == b'\n')
|
||||
);
|
||||
if self.bytestring[self.cursor..].starts_with(b"OK") {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Skip the binary data and the following newline
|
||||
self.cursor = value_end + 1;
|
||||
let remaining = &self.bytestring[self.cursor..];
|
||||
let newline_pos = remaining
|
||||
.iter()
|
||||
.position(|&b| b == b'\n')
|
||||
.unwrap_or(remaining.len());
|
||||
|
||||
Some(Ok((key, GenericResponseValue::Binary(value_bytes))))
|
||||
} else {
|
||||
let value_bytes = match keyval.next() {
|
||||
Some(v) => v.trim_ascii_start(),
|
||||
None => b"",
|
||||
};
|
||||
// TODO: this should be a proper runtime error, the specification
|
||||
// declares that all string values are UTF-8.
|
||||
debug_assert!(std::str::from_utf8(value_bytes).is_ok());
|
||||
let value_str = std::str::from_utf8(value_bytes).ok()?;
|
||||
let line = &remaining[..newline_pos];
|
||||
|
||||
Some(Ok((key, GenericResponseValue::Text(value_str))))
|
||||
// NOTE: it is important that this happens before any further None returns,
|
||||
// so that the iterator advances despite errors.
|
||||
self.cursor += newline_pos + 1;
|
||||
|
||||
if line.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
return Some(parse_line(line, self));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_line<'a>(
|
||||
line: &'a [u8],
|
||||
state: &mut ResponseAttributes<'a>,
|
||||
) -> Result<(&'a str, GenericResponseValue<'a>), ResponseParserError> {
|
||||
let mut parts = line.splitn(2, |&b| b == b':');
|
||||
|
||||
let key = parts
|
||||
.next()
|
||||
.filter(|k| !k.is_empty())
|
||||
.and_then(|k| std::str::from_utf8(k).ok())
|
||||
.ok_or_else(|| ResponseParserError::SyntaxError(0, "Invalid key".into()))?;
|
||||
|
||||
match key {
|
||||
"binary" => parse_binary(parts.next(), state).map(|v| (key, v)),
|
||||
_ => {
|
||||
let value = parts.next().unwrap_or(b"").trim_ascii_start();
|
||||
|
||||
let text = std::str::from_utf8(value)
|
||||
.map_err(|_| ResponseParserError::SyntaxError(0, "Invalid UTF-8".into()))?;
|
||||
|
||||
Ok((key, GenericResponseValue::Text(text)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_binary<'a>(
|
||||
count_bytes: Option<&[u8]>,
|
||||
state: &mut ResponseAttributes<'a>,
|
||||
) -> Result<GenericResponseValue<'a>, ResponseParserError> {
|
||||
// In the case of binary data, the following value will be the byte count
|
||||
// in decimal, and the actual binary data will follow in the next N bytes,
|
||||
// followed by a newline.
|
||||
//
|
||||
// We parse the number and assign the binary data to the "binary" key.
|
||||
let count = count_bytes
|
||||
.map(|b| b.trim_ascii_start())
|
||||
.and_then(|b| std::str::from_utf8(b).ok())
|
||||
.and_then(|s| s.parse::<usize>().ok())
|
||||
.ok_or_else(|| ResponseParserError::SyntaxError(0, "Invalid byte count".into()))?;
|
||||
|
||||
let start = state.cursor;
|
||||
let end = start + count;
|
||||
|
||||
if end > state.bytestring.len() {
|
||||
return Err(ResponseParserError::UnexpectedEOF);
|
||||
}
|
||||
|
||||
let bytes = &state.bytestring[start..end];
|
||||
|
||||
if state.bytestring.get(end).is_some_and(|&b| b != b'\n') {
|
||||
return Err(ResponseParserError::SyntaxError(
|
||||
0,
|
||||
"Missing newline".into(),
|
||||
));
|
||||
}
|
||||
|
||||
state.cursor = end + 1;
|
||||
|
||||
Ok(GenericResponseValue::Binary(bytes))
|
||||
}
|
||||
|
||||
impl<'a> From<ResponseAttributes<'a>>
|
||||
for Result<HashMap<&'a str, GenericResponseValue<'a>>, ResponseParserError>
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user