| /*** |
| This file is part of systemd. |
| |
| Copyright 2014 Zbigniew Jędrzejewski-Szmek |
| |
| systemd is free software; you can redistribute it and/or modify it |
| under the terms of the GNU Lesser General Public License as published by |
| the Free Software Foundation; either version 2.1 of the License, or |
| (at your option) any later version. |
| |
| systemd is distributed in the hope that it will be useful, but |
| WITHOUT ANY WARRANTY; without even the implied warranty of |
| MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| Lesser General Public License for more details. |
| |
| You should have received a copy of the GNU Lesser General Public License |
| along with systemd; If not, see <http://www.gnu.org/licenses/>. |
| ***/ |
| |
| #include <curl/curl.h> |
| #include <stdbool.h> |
| |
| #include "alloc-util.h" |
| #include "journal-upload.h" |
| #include "log.h" |
| #include "utf8.h" |
| #include "util.h" |
| #include "sd-daemon.h" |
| |
| /** |
| * Write up to size bytes to buf. Return negative on error, and number of |
| * bytes written otherwise. The last case is a kind of an error too. |
| */ |
| static ssize_t write_entry(char *buf, size_t size, Uploader *u) { |
| int r; |
| size_t pos = 0; |
| |
| assert(size <= SSIZE_MAX); |
| |
| for (;;) { |
| |
| switch(u->entry_state) { |
| case ENTRY_CURSOR: { |
| u->current_cursor = mfree(u->current_cursor); |
| |
| r = sd_journal_get_cursor(u->journal, &u->current_cursor); |
| if (r < 0) |
| return log_error_errno(r, "Failed to get cursor: %m"); |
| |
| r = snprintf(buf + pos, size - pos, |
| "__CURSOR=%s\n", u->current_cursor); |
| if (pos + r > size) |
| /* not enough space */ |
| return pos; |
| |
| u->entry_state++; |
| |
| if (pos + r == size) { |
| /* exactly one character short, but we don't need it */ |
| buf[size - 1] = '\n'; |
| return size; |
| } |
| |
| pos += r; |
| } /* fall through */ |
| |
| case ENTRY_REALTIME: { |
| usec_t realtime; |
| |
| r = sd_journal_get_realtime_usec(u->journal, &realtime); |
| if (r < 0) |
| return log_error_errno(r, "Failed to get realtime timestamp: %m"); |
| |
| r = snprintf(buf + pos, size - pos, |
| "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime); |
| if (r + pos > size) |
| /* not enough space */ |
| return pos; |
| |
| u->entry_state++; |
| |
| if (r + pos == size) { |
| /* exactly one character short, but we don't need it */ |
| buf[size - 1] = '\n'; |
| return size; |
| } |
| |
| pos += r; |
| } /* fall through */ |
| |
| case ENTRY_MONOTONIC: { |
| usec_t monotonic; |
| sd_id128_t boot_id; |
| |
| r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id); |
| if (r < 0) |
| return log_error_errno(r, "Failed to get monotonic timestamp: %m"); |
| |
| r = snprintf(buf + pos, size - pos, |
| "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic); |
| if (r + pos > size) |
| /* not enough space */ |
| return pos; |
| |
| u->entry_state++; |
| |
| if (r + pos == size) { |
| /* exactly one character short, but we don't need it */ |
| buf[size - 1] = '\n'; |
| return size; |
| } |
| |
| pos += r; |
| } /* fall through */ |
| |
| case ENTRY_BOOT_ID: { |
| sd_id128_t boot_id; |
| char sid[33]; |
| |
| r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id); |
| if (r < 0) |
| return log_error_errno(r, "Failed to get monotonic timestamp: %m"); |
| |
| r = snprintf(buf + pos, size - pos, |
| "_BOOT_ID=%s\n", sd_id128_to_string(boot_id, sid)); |
| if (r + pos > size) |
| /* not enough space */ |
| return pos; |
| |
| u->entry_state++; |
| |
| if (r + pos == size) { |
| /* exactly one character short, but we don't need it */ |
| buf[size - 1] = '\n'; |
| return size; |
| } |
| |
| pos += r; |
| } /* fall through */ |
| |
| case ENTRY_NEW_FIELD: { |
| u->field_pos = 0; |
| |
| r = sd_journal_enumerate_data(u->journal, |
| &u->field_data, |
| &u->field_length); |
| if (r < 0) |
| return log_error_errno(r, "Failed to move to next field in entry: %m"); |
| else if (r == 0) { |
| u->entry_state = ENTRY_OUTRO; |
| continue; |
| } |
| |
| if (!utf8_is_printable_newline(u->field_data, |
| u->field_length, false)) { |
| u->entry_state = ENTRY_BINARY_FIELD_START; |
| continue; |
| } |
| |
| u->entry_state++; |
| } /* fall through */ |
| |
| case ENTRY_TEXT_FIELD: |
| case ENTRY_BINARY_FIELD: { |
| bool done; |
| size_t tocopy; |
| |
| done = size - pos > u->field_length - u->field_pos; |
| if (done) |
| tocopy = u->field_length - u->field_pos; |
| else |
| tocopy = size - pos; |
| |
| memcpy(buf + pos, |
| (char*) u->field_data + u->field_pos, |
| tocopy); |
| |
| if (done) { |
| buf[pos + tocopy] = '\n'; |
| pos += tocopy + 1; |
| u->entry_state = ENTRY_NEW_FIELD; |
| continue; |
| } else { |
| u->field_pos += tocopy; |
| return size; |
| } |
| } |
| |
| case ENTRY_BINARY_FIELD_START: { |
| const char *c; |
| size_t len; |
| |
| c = memchr(u->field_data, '=', u->field_length); |
| if (!c || c == u->field_data) { |
| log_error("Invalid field."); |
| return -EINVAL; |
| } |
| |
| len = c - (const char*)u->field_data; |
| |
| /* need space for label + '\n' */ |
| if (size - pos < len + 1) |
| return pos; |
| |
| memcpy(buf + pos, u->field_data, len); |
| buf[pos + len] = '\n'; |
| pos += len + 1; |
| |
| u->field_pos = len + 1; |
| u->entry_state++; |
| } /* fall through */ |
| |
| case ENTRY_BINARY_FIELD_SIZE: { |
| uint64_t le64; |
| |
| /* need space for uint64_t */ |
| if (size - pos < 8) |
| return pos; |
| |
| le64 = htole64(u->field_length - u->field_pos); |
| memcpy(buf + pos, &le64, 8); |
| pos += 8; |
| |
| u->entry_state++; |
| continue; |
| } |
| |
| case ENTRY_OUTRO: |
| /* need space for '\n' */ |
| if (size - pos < 1) |
| return pos; |
| |
| buf[pos++] = '\n'; |
| u->entry_state++; |
| u->entries_sent++; |
| |
| return pos; |
| |
| default: |
| assert_not_reached("WTF?"); |
| } |
| } |
| assert_not_reached("WTF?"); |
| } |
| |
| static inline void check_update_watchdog(Uploader *u) { |
| usec_t after; |
| usec_t elapsed_time; |
| |
| if (u->watchdog_usec <= 0) |
| return; |
| |
| after = now(CLOCK_MONOTONIC); |
| elapsed_time = usec_sub_unsigned(after, u->watchdog_timestamp); |
| if (elapsed_time > u->watchdog_usec / 2) { |
| log_debug("Update watchdog timer"); |
| sd_notify(false, "WATCHDOG=1"); |
| u->watchdog_timestamp = after; |
| } |
| } |
| |
| static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) { |
| Uploader *u = userp; |
| int r; |
| sd_journal *j; |
| size_t filled = 0; |
| ssize_t w; |
| |
| assert(u); |
| assert(nmemb <= SSIZE_MAX / size); |
| |
| check_update_watchdog(u); |
| |
| j = u->journal; |
| |
| while (j && filled < size * nmemb) { |
| if (u->entry_state == ENTRY_DONE) { |
| r = sd_journal_next(j); |
| if (r < 0) { |
| log_error_errno(r, "Failed to move to next entry in journal: %m"); |
| return CURL_READFUNC_ABORT; |
| } else if (r == 0) { |
| if (u->input_event) |
| log_debug("No more entries, waiting for journal."); |
| else { |
| log_info("No more entries, closing journal."); |
| close_journal_input(u); |
| } |
| |
| u->uploading = false; |
| |
| break; |
| } |
| |
| u->entry_state = ENTRY_CURSOR; |
| } |
| |
| w = write_entry((char*)buf + filled, size * nmemb - filled, u); |
| if (w < 0) |
| return CURL_READFUNC_ABORT; |
| filled += w; |
| |
| if (filled == 0) { |
| log_error("Buffer space is too small to write entry."); |
| return CURL_READFUNC_ABORT; |
| } else if (u->entry_state != ENTRY_DONE) |
| /* This means that all available space was used up */ |
| break; |
| |
| log_debug("Entry %zu (%s) has been uploaded.", |
| u->entries_sent, u->current_cursor); |
| } |
| |
| return filled; |
| } |
| |
| void close_journal_input(Uploader *u) { |
| assert(u); |
| |
| if (u->journal) { |
| log_debug("Closing journal input."); |
| |
| sd_journal_close(u->journal); |
| u->journal = NULL; |
| } |
| u->timeout = 0; |
| } |
| |
| static int process_journal_input(Uploader *u, int skip) { |
| int r; |
| |
| if (u->uploading) |
| return 0; |
| |
| r = sd_journal_next_skip(u->journal, skip); |
| if (r < 0) |
| return log_error_errno(r, "Failed to skip to next entry: %m"); |
| else if (r < skip) |
| return 0; |
| |
| /* have data */ |
| u->entry_state = ENTRY_CURSOR; |
| return start_upload(u, journal_input_callback, u); |
| } |
| |
| int check_journal_input(Uploader *u) { |
| if (u->input_event) { |
| int r; |
| |
| r = sd_journal_process(u->journal); |
| if (r < 0) { |
| log_error_errno(r, "Failed to process journal: %m"); |
| close_journal_input(u); |
| return r; |
| } |
| |
| if (r == SD_JOURNAL_NOP) |
| return 0; |
| } |
| |
| return process_journal_input(u, 1); |
| } |
| |
| static int dispatch_journal_input(sd_event_source *event, |
| int fd, |
| uint32_t revents, |
| void *userp) { |
| Uploader *u = userp; |
| |
| assert(u); |
| |
| if (u->uploading) |
| return 0; |
| |
| log_debug("Detected journal input, checking for new data."); |
| return check_journal_input(u); |
| } |
| |
| int open_journal_for_upload(Uploader *u, |
| sd_journal *j, |
| const char *cursor, |
| bool after_cursor, |
| bool follow) { |
| int fd, r, events; |
| |
| u->journal = j; |
| |
| sd_journal_set_data_threshold(j, 0); |
| |
| if (follow) { |
| fd = sd_journal_get_fd(j); |
| if (fd < 0) |
| return log_error_errno(fd, "sd_journal_get_fd failed: %m"); |
| |
| events = sd_journal_get_events(j); |
| |
| r = sd_journal_reliable_fd(j); |
| assert(r >= 0); |
| if (r > 0) |
| u->timeout = -1; |
| else |
| u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT; |
| |
| r = sd_event_add_io(u->events, &u->input_event, |
| fd, events, dispatch_journal_input, u); |
| if (r < 0) |
| return log_error_errno(r, "Failed to register input event: %m"); |
| |
| log_debug("Listening for journal events on fd:%d, timeout %d", |
| fd, u->timeout == (uint64_t) -1 ? -1 : (int) u->timeout); |
| } else |
| log_debug("Not listening for journal events."); |
| |
| if (cursor) { |
| r = sd_journal_seek_cursor(j, cursor); |
| if (r < 0) |
| return log_error_errno(r, "Failed to seek to cursor %s: %m", |
| cursor); |
| } |
| |
| return process_journal_input(u, 1 + !!after_cursor); |
| } |