From 1bf3323015d52bb8eb2ad33fa4ea3e90d925e9b5 Mon Sep 17 00:00:00 2001 From: Connor Peet Date: Tue, 15 Nov 2022 15:17:48 -0800 Subject: [PATCH] cli: add service integration for macos, observability Adds support for running the tunnel as a service on macOS via launchservices. It also hooks up observability (`code tunnel service log`) on macOS and Linux. On macOS--and later Windows, hence the manual implementation of `tail`--it saves output to a log file and watches it. On Linux, it simply delegates to journalctl. The "tailing" is implemented via simple polling of the file size. I didn't want to pull in a giant set of dependencies for inotify/kqueue/etc just for this use case; performance when polling a single log file is not a huge concern. --- cli/src/bin/code/main.rs | 39 ++-- cli/src/commands/args.rs | 9 +- cli/src/commands/tunnels.rs | 5 +- cli/src/state.rs | 5 + cli/src/tunnels.rs | 2 + cli/src/tunnels/service.rs | 43 +---- cli/src/tunnels/service_linux.rs | 24 +++ cli/src/tunnels/service_macos.rs | 188 ++++++++++++++++++++ cli/src/tunnels/service_windows.rs | 4 + cli/src/util/command.rs | 28 ++- cli/src/util/errors.rs | 35 +++- cli/src/util/io.rs | 276 ++++++++++++++++++++++++++++- 12 files changed, 606 insertions(+), 52 deletions(-) create mode 100644 cli/src/tunnels/service_macos.rs diff --git a/cli/src/bin/code/main.rs b/cli/src/bin/code/main.rs index ce98c0faaf4..5353f5b9121 100644 --- a/cli/src/bin/code/main.rs +++ b/cli/src/bin/code/main.rs @@ -9,6 +9,7 @@ use std::process::Command; use clap::Parser; use cli::{ commands::{args, tunnels, update, version, CommandContext}, + constants::get_default_user_agent, desktop, log as own_log, state::LauncherPaths, util::{ @@ -38,16 +39,12 @@ async fn main() -> Result<(), std::convert::Infallible> { let core = parsed.core(); let context = CommandContext { - http: reqwest::Client::new(), + http: reqwest::ClientBuilder::new() + .user_agent(get_default_user_agent()) + .build() + .unwrap(), paths: LauncherPaths::new(&core.global_options.cli_data_dir).unwrap(), - log: own_log::Logger::new( - SdkTracerProvider::builder().build().tracer("codecli"), - if core.global_options.verbose { - own_log::Level::Trace - } else { - core.global_options.log.unwrap_or(own_log::Level::Info) - }, - ), + log: make_logger(core), args: core.clone(), }; @@ -111,6 +108,23 @@ async fn main() -> Result<(), std::convert::Infallible> { } } +fn make_logger(core: &args::CliCore) -> own_log::Logger { + let log_level = if core.global_options.verbose { + own_log::Level::Trace + } else { + core.global_options.log.unwrap_or(own_log::Level::Info) + }; + + let tracer = SdkTracerProvider::builder().build().tracer("codecli"); + let mut log = own_log::Logger::new(tracer, log_level); + if let Some(f) = &core.global_options.log_to_file { + log = + log.tee(own_log::FileLogSink::new(log_level, f).expect("expected to make file logger")) + } + + log +} + fn print_and_exit(err: E) -> ! where E: std::fmt::Display, @@ -143,7 +157,12 @@ async fn start_code(context: CommandContext, args: Vec) -> Result, + /// Log level to use. #[clap(long, arg_enum, value_name = "level", global = true)] pub log: Option, @@ -596,6 +600,9 @@ pub enum TunnelServiceSubCommands { /// Uninstalls and stops the tunnel service. Uninstall, + /// Shows logs for the running service. + Log, + /// Internal command for running the service #[clap(hide = true)] InternalRun, diff --git a/cli/src/commands/tunnels.rs b/cli/src/commands/tunnels.rs index 25a4601b52a..2d12e7c07a1 100644 --- a/cli/src/commands/tunnels.rs +++ b/cli/src/commands/tunnels.rs @@ -116,13 +116,11 @@ pub async fn service( match service_args { TunnelServiceSubCommands::Install => { // ensure logged in, otherwise subsequent serving will fail - println!("authing"); Auth::new(&ctx.paths, ctx.log.clone()) .get_credential() .await?; // likewise for license consent - println!("consent"); legal::require_consent(&ctx.paths, false)?; let current_exe = @@ -147,6 +145,9 @@ pub async fn service( TunnelServiceSubCommands::Uninstall => { manager.unregister().await?; } + TunnelServiceSubCommands::Log => { + manager.show_logs().await?; + } TunnelServiceSubCommands::InternalRun => { manager .run(ctx.paths.clone(), TunnelServiceContainer::new(ctx.args)) diff --git a/cli/src/state.rs b/cli/src/state.rs index 1a33ea755f8..3e55e139b1b 100644 --- a/cli/src/state.rs +++ b/cli/src/state.rs @@ -137,6 +137,11 @@ impl LauncherPaths { &self.root } + /// Suggested path for tunnel service logs, when using file logs + pub fn service_log_file(&self) -> PathBuf { + self.root.join("tunnel-service.log") + } + /// Removes the launcher data directory. pub fn remove(&self) -> Result<(), WrappedError> { remove_dir_all(&self.root).map_err(|e| { diff --git a/cli/src/tunnels.rs b/cli/src/tunnels.rs index e451dfb6621..d80ad637d3e 100644 --- a/cli/src/tunnels.rs +++ b/cli/src/tunnels.rs @@ -21,6 +21,8 @@ mod service; mod service_linux; #[cfg(target_os = "windows")] mod service_windows; +#[cfg(target_os = "macos")] +mod service_macos; pub use control_server::serve; pub use service::{ diff --git a/cli/src/tunnels/service.rs b/cli/src/tunnels/service.rs index f66fd5d6b9f..b263b6ea6ff 100644 --- a/cli/src/tunnels/service.rs +++ b/cli/src/tunnels/service.rs @@ -40,6 +40,9 @@ pub trait ServiceManager { handle: impl 'static + ServiceContainer, ) -> Result<(), AnyError>; + /// Show logs from the running service to standard out. + async fn show_logs(&self) -> Result<(), AnyError>; + /// Unregisters the current executable as a service. async fn unregister(&self) -> Result<(), AnyError>; } @@ -50,12 +53,16 @@ pub type ServiceManagerImpl = super::service_windows::WindowsService; #[cfg(target_os = "linux")] pub type ServiceManagerImpl = super::service_linux::SystemdService; -#[cfg(not(any(target_os = "windows", target_os = "linux")))] -pub type ServiceManagerImpl = UnimplementedServiceManager; +#[cfg(target_os = "macos")] +pub type ServiceManagerImpl = super::service_macos::LaunchdService; #[allow(unreachable_code)] #[allow(unused_variables)] pub fn create_service_manager(log: log::Logger, paths: &LauncherPaths) -> ServiceManagerImpl { + #[cfg(target_os = "macos")] + { + super::service_macos::LaunchdService::new(log, paths) + } #[cfg(target_os = "windows")] { super::service_windows::WindowsService::new(log) @@ -64,36 +71,4 @@ pub fn create_service_manager(log: log::Logger, paths: &LauncherPaths) -> Servic { super::service_linux::SystemdService::new(log, paths.clone()) } - #[cfg(not(any(target_os = "windows", target_os = "linux")))] - { - UnimplementedServiceManager::new() - } -} - -pub struct UnimplementedServiceManager(); - -#[allow(dead_code)] -impl UnimplementedServiceManager { - fn new() -> Self { - Self() - } -} - -#[async_trait] -impl ServiceManager for UnimplementedServiceManager { - async fn register(&self, _exe: PathBuf, _args: &[&str]) -> Result<(), AnyError> { - unimplemented!("Service management is not supported on this platform"); - } - - async fn run( - self, - _launcher_paths: LauncherPaths, - _handle: impl 'static + ServiceContainer, - ) -> Result<(), AnyError> { - unimplemented!("Service management is not supported on this platform"); - } - - async fn unregister(&self) -> Result<(), AnyError> { - unimplemented!("Service management is not supported on this platform"); - } } diff --git a/cli/src/tunnels/service_linux.rs b/cli/src/tunnels/service_linux.rs index e4f131d6e37..6f0129aa018 100644 --- a/cli/src/tunnels/service_linux.rs +++ b/cli/src/tunnels/service_linux.rs @@ -7,6 +7,7 @@ use std::{ fs::File, io::{self, Write}, path::PathBuf, + process::Command, }; use async_trait::async_trait; @@ -115,6 +116,29 @@ impl ServiceManager for SystemdService { handle.run_service(self.log, launcher_paths, rx).await } + async fn show_logs(&self) -> Result<(), AnyError> { + // show the systemctl status header... + Command::new("systemctl") + .args([ + "--user", + "status", + "-n", + "0", + &SystemdService::service_name_string(), + ]) + .status() + .map(|s| s.code().unwrap_or(1)) + .map_err(|e| wrap(e, format!("error running journalctl")))?; + + // then follow log files + Command::new("journalctl") + .args(["--user", "-f", "-u", &SystemdService::service_name_string()]) + .status() + .map(|s| s.code().unwrap_or(1)) + .map_err(|e| wrap(e, format!("error running journalctl")))?; + Ok(()) + } + async fn unregister(&self) -> Result<(), crate::util::errors::AnyError> { let connection = SystemdService::connect().await?; let proxy = SystemdService::proxy(&connection).await?; diff --git a/cli/src/tunnels/service_macos.rs b/cli/src/tunnels/service_macos.rs new file mode 100644 index 00000000000..f69f83d9cc8 --- /dev/null +++ b/cli/src/tunnels/service_macos.rs @@ -0,0 +1,188 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +use std::{ + fs::{remove_file, File}, + io::{self, Write}, + path::{Path, PathBuf}, +}; + +use async_trait::async_trait; +use tokio::sync::mpsc; + +use crate::{ + commands::tunnels::ShutdownSignal, + constants::APPLICATION_NAME, + log, + state::LauncherPaths, + util::{ + command::capture_command_and_check_status, + errors::{wrap, AnyError, MissingHomeDirectory}, + io::{tailf, TailEvent}, + }, +}; + +use super::ServiceManager; + +pub struct LaunchdService { + log: log::Logger, + log_file: PathBuf, +} + +impl LaunchdService { + pub fn new(log: log::Logger, paths: &LauncherPaths) -> Self { + Self { + log, + log_file: paths.service_log_file(), + } + } +} + +#[async_trait] +impl ServiceManager for LaunchdService { + async fn register( + &self, + exe: std::path::PathBuf, + args: &[&str], + ) -> Result<(), crate::util::errors::AnyError> { + let service_file = get_service_file_path()?; + write_service_file(&service_file, &self.log_file, exe, args) + .map_err(|e| wrap(e, "error creating service file"))?; + + info!(self.log, "Successfully registered service..."); + + capture_command_and_check_status( + "launchctl", + &["load", service_file.as_os_str().to_string_lossy().as_ref()], + ) + .await?; + + capture_command_and_check_status("launchctl", &["start", &get_service_label()]).await?; + + info!(self.log, "Tunnel service successfully started"); + + Ok(()) + } + + async fn show_logs(&self) -> Result<(), AnyError> { + if !self.log_file.exists() { + println!("The tunnel service has not started yet."); + return Ok(()); + } + + let file = + std::fs::File::open(&self.log_file).map_err(|e| wrap(e, "error opening log file"))?; + let mut rx = tailf(file, 20); + while let Some(line) = rx.recv().await { + match line { + TailEvent::Line(l) => print!("{}", l), + TailEvent::Reset => println!("== Tunnel service restarted =="), + TailEvent::Err(e) => return Err(wrap(e, "error reading log file").into()), + } + } + + Ok(()) + } + + async fn run( + self, + launcher_paths: crate::state::LauncherPaths, + mut handle: impl 'static + super::ServiceContainer, + ) -> Result<(), crate::util::errors::AnyError> { + let (tx, rx) = mpsc::channel::(1); + tokio::spawn(async move { + tokio::signal::ctrl_c().await.ok(); + tx.send(ShutdownSignal::CtrlC).await.ok(); + }); + + handle.run_service(self.log, launcher_paths, rx).await + } + + async fn unregister(&self) -> Result<(), crate::util::errors::AnyError> { + let service_file = get_service_file_path()?; + + match capture_command_and_check_status("launchctl", &["stop", &get_service_label()]).await { + Ok(_) => {} + // status 3 == "no such process" + Err(AnyError::CommandFailed(e)) if e.output.status.code() == Some(3) => {} + Err(e) => return Err(e), + }; + + info!(self.log, "Successfully stopped service..."); + + capture_command_and_check_status( + "launchctl", + &[ + "unload", + service_file.as_os_str().to_string_lossy().as_ref(), + ], + ) + .await?; + + info!(self.log, "Tunnel service uninstalled"); + + if let Ok(f) = get_service_file_path() { + remove_file(f).ok(); + } + + Ok(()) + } +} + +fn get_service_label() -> String { + format!("com.visualstudio.{}.tunnel", &*APPLICATION_NAME) +} + +fn get_service_file_path() -> Result { + match dirs::home_dir() { + Some(mut d) => { + d.push(format!("{}.plist", get_service_label())); + Ok(d) + } + None => Err(MissingHomeDirectory()), + } +} + +fn write_service_file( + path: &PathBuf, + log_file: &Path, + exe: std::path::PathBuf, + args: &[&str], +) -> io::Result<()> { + let mut f = File::create(path)?; + let log_file = log_file.as_os_str().to_string_lossy(); + // todo: we may be able to skip file logging and use the ASL instead + // if/when we no longer need to support older macOS versions. + write!( + &mut f, + "\n\ + \n\ + \n\ + \n\ + Label\n\ + {}\n\ + LimitLoadToSessionType\n\ + Aqua\n\ + ProgramArguments\n\ + \n\ + {}\n\ + {}\n\ + \n\ + KeepAlive\n\ + \n\ + StandardErrorPath\n\ + {}\n\ + StandardOutPath\n\ + {}\n\ + \n\ + ", + get_service_label(), + exe.into_os_string().to_string_lossy(), + args.join(""), + log_file, + log_file + )?; + Ok(()) +} diff --git a/cli/src/tunnels/service_windows.rs b/cli/src/tunnels/service_windows.rs index 9e743e89141..44ebc8cd139 100644 --- a/cli/src/tunnels/service_windows.rs +++ b/cli/src/tunnels/service_windows.rs @@ -119,6 +119,10 @@ impl CliServiceManager for WindowsService { Ok(()) } + async fn show_logs(&self) -> Result<(), AnyError> { + todo!(); + } + #[allow(unused_must_use)] // triggers incorrectly on `define_windows_service!` async fn run( self, diff --git a/cli/src/util/command.rs b/cli/src/util/command.rs index 7a7795e1590..7e3192b6399 100644 --- a/cli/src/util/command.rs +++ b/cli/src/util/command.rs @@ -2,10 +2,34 @@ * Copyright (c) Microsoft Corporation. All rights reserved. * Licensed under the MIT License. See License.txt in the project root for license information. *--------------------------------------------------------------------------------------------*/ -use super::errors::{wrap, WrappedError}; -use std::{ffi::OsStr, process::Stdio}; +use super::errors::{wrap, AnyError, CommandFailed, WrappedError}; +use std::{ffi::OsStr, process::Stdio, borrow::Cow}; use tokio::process::Command; +pub async fn capture_command_and_check_status( + command_str: impl AsRef, + args: &[impl AsRef], +) -> Result { + let output = capture_command(&command_str, args).await?; + + if !output.status.success() { + return Err(CommandFailed { + command: format!( + "{} {}", + command_str.as_ref().to_string_lossy(), + args.iter() + .map(|a| a.as_ref().to_string_lossy()) + .collect::>>() + .join(" ") + ), + output, + } + .into()); + } + + Ok(output) +} + pub async fn capture_command( command_str: A, args: I, diff --git a/cli/src/util/errors.rs b/cli/src/util/errors.rs index fe532014dae..46e7e3604b9 100644 --- a/cli/src/util/errors.rs +++ b/cli/src/util/errors.rs @@ -371,6 +371,37 @@ impl std::fmt::Display for CorruptDownload { } } +#[derive(Debug)] +pub struct MissingHomeDirectory(); + +impl std::fmt::Display for MissingHomeDirectory { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "Could not find your home directory. Please ensure this command is running in the context of an normal user.") + } +} + +#[derive(Debug)] +pub struct CommandFailed { + pub output: std::process::Output, + pub command: String, +} + +impl std::fmt::Display for CommandFailed { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + f, + "Failed to run command \"{}\" (code {}): {}", + self.command, + self.output.status, + String::from_utf8_lossy(if self.output.stderr.is_empty() { + &self.output.stdout + } else { + &self.output.stderr + }) + ) + } +} + // Makes an "AnyError" enum that contains any of the given errors, in the form // `enum AnyError { FooError(FooError) }` (when given `makeAnyError!(FooError)`). // Useful to easily deal with application error types without making tons of "From" @@ -433,7 +464,9 @@ makeAnyError!( ServiceAlreadyRegistered, WindowsNeedsElevation, UpdatesNotConfigured, - CorruptDownload + CorruptDownload, + MissingHomeDirectory, + CommandFailed ); impl From for AnyError { diff --git a/cli/src/util/io.rs b/cli/src/util/io.rs index 217ec2c74a3..a21a2ceb632 100644 --- a/cli/src/util/io.rs +++ b/cli/src/util/io.rs @@ -2,9 +2,18 @@ * Copyright (c) Microsoft Corporation. All rights reserved. * Licensed under the MIT License. See License.txt in the project root for license information. *--------------------------------------------------------------------------------------------*/ -use std::{io, task::Poll}; +use std::{ + fs::File, + io::{self, BufRead, Seek}, + task::Poll, + time::Duration, +}; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use tokio::{ + io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, + sync::mpsc, + time::sleep, +}; pub trait ReportCopyProgress { fn report_progress(&mut self, bytes_so_far: u64, total_bytes: u64); @@ -95,3 +104,266 @@ impl ReadBuffer { Poll::Ready(Ok(())) } } + +#[derive(Debug)] +pub enum TailEvent { + /// A new line was read from the file. The line includes its trailing newline character. + Line(String), + /// The file appears to have been rewritten (size shrunk) + Reset, + /// An error was encountered with the file. + Err(io::Error), +} + +/// Simple, naive implementation of `tail -f -n `. Uses polling, so +/// it's not the fastest, but simple and working for easy cases. +pub fn tailf(file: File, n: usize) -> mpsc::UnboundedReceiver { + let (tx, rx) = mpsc::unbounded_channel(); + let mut last_len = match file.metadata() { + Ok(m) => m.len(), + Err(e) => { + tx.send(TailEvent::Err(e)).ok(); + return rx; + } + }; + + let mut reader = io::BufReader::new(file); + let mut pos = 0; + + // Read the initial "n" lines back from the request. initial_lines + // is a small ring buffer. + let mut initial_lines = Vec::with_capacity(n); + let mut initial_lines_i = 0; + loop { + let mut line = String::new(); + let bytes_read = match reader.read_line(&mut line) { + Ok(0) => break, + Ok(n) => n, + Err(e) => { + tx.send(TailEvent::Err(e)).ok(); + return rx; + } + }; + + if !line.ends_with('\n') { + // EOF + break; + } + + pos += bytes_read as u64; + if initial_lines.len() < initial_lines.capacity() { + initial_lines.push(line) + } else { + initial_lines[initial_lines_i] = line; + } + + initial_lines_i = (initial_lines_i + 1) % n; + } + + // remove tail lines... + if initial_lines_i < initial_lines.len() { + for line in initial_lines.drain((initial_lines_i)..) { + tx.send(TailEvent::Line(line)).ok(); + } + } + // then the remaining lines + if !initial_lines.is_empty() { + for line in initial_lines.drain(0..) { + tx.send(TailEvent::Line(line)).ok(); + } + } + + // now spawn the poll process to keep reading new lines + tokio::spawn(async move { + let poll_interval = Duration::from_millis(500); + + loop { + tokio::select! { + _ = sleep(poll_interval) => {}, + _ = tx.closed() => return + } + + match reader.get_ref().metadata() { + Err(e) => { + tx.send(TailEvent::Err(e)).ok(); + return; + } + Ok(m) => { + if m.len() == last_len { + continue; + } + + if m.len() < last_len { + tx.send(TailEvent::Reset).ok(); + pos = 0; + } + + last_len = m.len(); + } + } + + if let Err(e) = reader.seek(io::SeekFrom::Start(pos)) { + tx.send(TailEvent::Err(e)).ok(); + return; + } + + loop { + let mut line = String::new(); + let n = match reader.read_line(&mut line) { + Ok(0) => break, + Ok(n) => n, + Err(e) => { + tx.send(TailEvent::Err(e)).ok(); + return; + } + }; + + if n == 0 || !line.ends_with('\n') { + break; + } + + pos += n as u64; + if tx.send(TailEvent::Line(line)).is_err() { + return; + } + } + } + }); + + rx +} + +#[cfg(test)] +mod tests { + use rand::Rng; + use std::{fs::OpenOptions, io::Write}; + + use super::*; + + #[tokio::test] + async fn test_tailf_empty() { + let dir = tempfile::tempdir().unwrap(); + let file_path = dir.path().join("tmp"); + + let read_file = OpenOptions::new() + .write(true) + .read(true) + .create(true) + .open(&file_path) + .unwrap(); + + let mut rx = tailf(read_file, 32); + assert!(rx.try_recv().is_err()); + + let mut append_file = OpenOptions::new() + .write(true) + .append(true) + .open(&file_path) + .unwrap(); + writeln!(&mut append_file, "some line").unwrap(); + + let recv = rx.recv().await; + if let Some(TailEvent::Line(l)) = recv { + assert_eq!("some line\n".to_string(), l); + } else { + unreachable!("expect a line event, got {:?}", recv) + } + + write!(&mut append_file, "partial ").unwrap(); + writeln!(&mut append_file, "line").unwrap(); + + let recv = rx.recv().await; + if let Some(TailEvent::Line(l)) = recv { + assert_eq!("partial line\n".to_string(), l); + } else { + unreachable!("expect a line event, got {:?}", recv) + } + } + + #[tokio::test] + async fn test_tailf_resets() { + let dir = tempfile::tempdir().unwrap(); + let file_path = dir.path().join("tmp"); + + let mut read_file = OpenOptions::new() + .write(true) + .read(true) + .create(true) + .open(&file_path) + .unwrap(); + + writeln!(&mut read_file, "some existing content").unwrap(); + let mut rx = tailf(read_file, 0); + assert!(rx.try_recv().is_err()); + + let mut append_file = File::create(&file_path).unwrap(); // truncates + writeln!(&mut append_file, "some line").unwrap(); + + let recv = rx.recv().await; + if let Some(TailEvent::Reset) = recv { + // ok + } else { + unreachable!("expect a reset event, got {:?}", recv) + } + + let recv = rx.recv().await; + if let Some(TailEvent::Line(l)) = recv { + assert_eq!("some line\n".to_string(), l); + } else { + unreachable!("expect a line event, got {:?}", recv) + } + } + + #[tokio::test] + async fn test_tailf_with_data() { + let dir = tempfile::tempdir().unwrap(); + let file_path = dir.path().join("tmp"); + + let mut read_file = OpenOptions::new() + .write(true) + .read(true) + .create(true) + .open(&file_path) + .unwrap(); + let mut rng = rand::thread_rng(); + + let mut written = vec![]; + let base_line = "Elit ipsum cillum ex cillum. Adipisicing consequat cupidatat do proident ut in sunt Lorem ipsum tempor. Eiusmod ipsum Lorem labore exercitation sunt pariatur excepteur fugiat cillum velit cillum enim. Nisi Lorem cupidatat ad enim velit officia eiusmod esse tempor aliquip. Deserunt pariatur tempor in duis culpa esse sit nulla irure ullamco ipsum voluptate non laboris. Occaecat officia nulla officia mollit do aliquip reprehenderit ad incididunt."; + for i in 0..100 { + let line = format!("{}: {}", i, &base_line[..rng.gen_range(0..base_line.len())]); + writeln!(&mut read_file, "{}", line).unwrap(); + written.push(line); + } + write!(&mut read_file, "partial line").unwrap(); + read_file.seek(io::SeekFrom::Start(0)).unwrap(); + + let last_n = 32; + let mut rx = tailf(read_file, last_n); + for i in 0..last_n { + let recv = rx.try_recv().unwrap(); + if let TailEvent::Line(l) = recv { + let mut expected = written[written.len() - last_n + i].to_string(); + expected.push('\n'); + assert_eq!(expected, l); + } else { + unreachable!("expect a line event, got {:?}", recv) + } + } + + assert!(rx.try_recv().is_err()); + + let mut append_file = OpenOptions::new() + .write(true) + .append(true) + .open(&file_path) + .unwrap(); + writeln!(append_file, " is now complete").unwrap(); + + let recv = rx.recv().await; + if let Some(TailEvent::Line(l)) = recv { + assert_eq!("partial line is now complete\n".to_string(), l); + } else { + unreachable!("expect a line event, got {:?}", recv) + } + } +}