def get_best_schema_from_csv()

in notebooks/Utilities/finspace.py [0:0]


    def get_best_schema_from_csv(self, path, is_s3=True, read_rows=500, sep=','):
        """
        Uses multiple reads of the file with pandas to determine schema of the referenced files.
        Files are expected to be csv.

        :param path: path to the files to read
        :type: str

        :param is_s3: True if the path is s3;  False if filesystem
        :type: bool

        :param read_rows: number of rows to sample for determining schema

        :param sep:

        :return dict: schema for FinSpace
        """
        #
        # best efforts to determine the schema, sight unseen
        import awswrangler as wr

        # 1: get the base schema
        df1 = None

        if is_s3:
            df1 = wr.s3.read_csv(path, nrows=read_rows, sep=sep)
        else:
            df1 = pd.read_csv(path, nrows=read_rows, sep=sep)

        num_cols = len(df1.columns)

        # with number of columns, try to infer dates
        df2 = None

        if is_s3:
            df2 = wr.s3.read_csv(path, parse_dates=list(range(0, num_cols)), infer_datetime_format=True,
                                 nrows=read_rows, sep=sep)
        else:
            df2 = pd.read_csv(path, parse_dates=list(range(0, num_cols)), infer_datetime_format=True, nrows=read_rows,
                              sep=sep)

        date_cols = self.get_date_cols(df2)

        # with dates known, parse the file fully
        df = None

        if is_s3:
            df = wr.s3.read_csv(path, parse_dates=date_cols, infer_datetime_format=True, nrows=read_rows, sep=sep)
        else:
            df = pd.read_csv(path, parse_dates=date_cols, infer_datetime_format=True, nrows=read_rows, sep=sep)

        schema_cols = self.get_schema_from_pandas(df)

        return (schema_cols)