Skip to content

Add a string (or stream) API for CSV #47

@agladysh

Description

@agladysh

I was looking for an easy way to handle CSV as strings (not files) today, and had to hack together a solution based on query_csv(), below.

That could be a documentation issue (or even lack of attention on my end), but it seems that there are low-level (browser) APIs and a high-level (node) API, but no middle ground.

It might be nice to expose some friendly API to support middle ground use cases. The boilerplate is rather long now.

Or, for my case, just support CSV strings, if that is in scope of the library.

If I miss something and there is an easier solution, please do share. :-)

Sorry for the typescript, if you're interested, I can try to submit a pure-JS PR, provided you share your preferences on the API design.

interface QueryFileOptions {
  /**
   * Whether the CSV file has a header row.
   * Default false.
   */
  hasHeader: boolean;

  /**
   * Comment prefix to treat lines as comments.
   * Lines starting with this prefix will be ignored.
   */
  commentPrefix: string;

  /**
   * Input CSV encoding. Default 'utf-8'.
   */
  encoding: 'utf-8' | 'latin-1' | 'binary';

  /**
   * Input CSV field delimiter. Default ','.
   */
  inputDelimiter: string;

  /**
   * Input CSV parsing policy. Default 'quoted_rfc'.
   */
  inputPolicy: 'simple' | 'quoted' | 'quoted_rfc';

  /**
   * Output CSV field delimiter. Default `inputDelimiter`.
   */
  outputDelimiter: string;

  /**
   * Output CSV formatting policy. Default `inputPolicy`.
   */
  outputPolicy: 'simple' | 'quoted' | 'quoted_rfc';
}

function loadOptions(query: string, partialOptions?: Partial<QueryFileOptions>): QueryFileOptions {
  const defaultOptions: Partial<QueryFileOptions> = {
    hasHeader: true,
    encoding: 'utf-8',
    inputDelimiter: ',',
    inputPolicy: 'quoted_rfc'
  };

  const options = { ...defaultOptions, ...partialOptions };
  options.outputDelimiter ??= options.inputDelimiter;
  options.outputPolicy ??= options.inputPolicy;

  if (options.encoding === 'latin-1') {
    options.encoding = 'binary';
  }

  if (options.inputDelimiter === '"' && options.inputPolicy === 'quoted') {
    throw new RbqlIOHandlingError('Double quote delimiter is incompatible with "quoted" policy');
  }

  if (!isAscii(query) && options.encoding === 'binary') {
    throw new RbqlIOHandlingError(
      'To use non-ascii characters in query enable UTF-8 encoding instead of latin-1/binary'
    );
  }
  if ((!isAscii(options.inputDelimiter!) || !isAscii(options.outputDelimiter!)) && options.encoding === 'binary') {
    throw new RbqlIOHandlingError(
      'To use non-ascii characters in query enable UTF-8 encoding instead of latin-1/binary'
    );
  }

  return options as QueryFileOptions;
}

// TODO: Extend for several tables?
class JoinRegistry implements RBQLJoinTableRegistry {
  private recordIterator: RBQLInputIterator;

  constructor(recordIterator: RBQLInputIterator) {
    this.recordIterator = recordIterator;
  }

  get_iterator_by_table_id(tableId: string) {
    if (tableId !== 'b') {
      throw new RbqlIOHandlingError(`Unable to find join table "${tableId}"`);
    }

    return this.recordIterator;
  }

  // TODO: Implement?
  get_warnings(_warnings: string[]) {  }
}

async function queryImpl(
  query: string,
  inputStream: Readable,
  outputStream: Writable,
  options: QueryFileOptions,
  joinStream?: Readable
): Promise<void> {
  const inputIterator = new CSVRecordIterator(
    inputStream,
    null,
    options.encoding,
    options.inputDelimiter,
    options.inputPolicy,
    options.hasHeader,
    null,
    'a',
    'a'
  );

  const outputWriter = new CSVWriter(
    outputStream,
    true,
    options.encoding,
    options.outputDelimiter,
    options.outputPolicy
  );

  const joinRegistry = joinStream && new JoinRegistry(
    new CSVRecordIterator(
      joinStream,
      null,
      options.encoding,
      options.inputDelimiter,
      options.inputPolicy,
      options.hasHeader,
      null,
      'b',
      'b'
    )
  );

  // TODO: Handle warnings
  const warnings: string[] = [ ];
  // Promise passthrough, no await.
  return rbql.query(query, inputIterator, outputWriter, warnings, joinRegistry);
}

  async function query(
    data: string,
    query: string,
    joinData?: string,
    partialOptions?: Partial<QueryFileOptions>
  ): Promise<string> {
    const options = loadOptions(query, partialOptions);

    const inputStream = Readable.from(data, { objectMode: false });
    const outputStream = new WritableStreamBuffer();
    const joinStream = joinData ? Readable.from(joinData, { objectMode: false }) : undefined;

    await queryImpl(query, inputStream, outputStream, options, joinStream);

    // TODO: Get rid of this ||.
    return outputStream.getContentsAsString(options.encoding) || '(Error!)';
  }
}

Metadata

Metadata

Assignees

No one assigned

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions